baldur_pro.services.canary — Canary Rollout
Progressive canary rollouts with safety interlocks: CanaryRolloutService,
CanaryFeatureFlag, and CanarySafetyInterlock.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
canary
Canary Rollout Module.
Gradual rollout and automatic rollback system for configuration changes.
Reference: docs/baldur/middleware_system/71_CANARY_CONFIG_ROLLOUT.md
Components
- models.py: CanaryState, CanaryStage, CanaryRollout, CanaryMetrics, PassCriteria
- service.py: CanaryRolloutService (thin orchestrator), get_canary_rollout_service
- state_machine.py: RolloutStateMachine (state transition rules)
- cluster_applicator.py: ClusterApplicator (per-cluster config application)
- evaluator.py: CanaryEvaluator (Shadow + Live evaluation)
- serializer.py: RolloutSerializer (serialization/deserialization)
- metrics_collector.py: CanaryMetricsCollector (metrics collection)
- locking.py: CanaryConfigLock, ConfigLockError
- versioning.py: VersionChecker, check_version_and_rollback, VersionConflictError
- chaos_guard.py: CanaryChaosGuard, ChaosConflictPolicy, ChaosConflictResult
- audit.py: log_canary_action, log_canary_error
- cross_cluster.py: CrossClusterNotifier, CrossClusterPropagationRequest, GovernancePolicySync
- feature_flag.py: CanaryFeatureFlag, CanaryFlagConfig, CanaryConfigMiddleware
Usage
from baldur_pro.services.canary import ( # Data Models CanaryState, CanaryStage, CanaryRollout, CanaryMetrics, PassCriteria, # Service get_canary_rollout_service, CanaryRolloutService, # Locking CanaryConfigLock, ConfigLockError, # Versioning VersionConflictError, check_version_and_rollback, # Chaos Guard CanaryChaosGuard, ChaosConflictPolicy, )
Example: Create and start a rollout
service = get_canary_rollout_service() rollout = service.create_rollout( config_type="circuit_breaker", new_values={"failure_threshold": 3}, stages=[ CanaryStage(name="canary", clusters=["seoul-canary"], percentage=10), ], created_by="[email protected]", ) service.start_rollout(rollout.id)
Status: Public
CanaryStage
dataclass
CanaryStage(
name: str,
clusters: list[str],
percentage: float,
duration_minutes: int = 5,
auto_promote: bool = True,
pass_criteria: PassCriteria = PassCriteria(),
error_rate_threshold: float = 0.05,
latency_increase_threshold: float = 0.5,
)
One stage in a multi-stage canary rollout plan.
A rollout is composed of stages; each stage applies the new configuration to a subset of clusters and waits for the stage duration plus pass-criteria evaluation before promoting.
CanaryState
Bases: str, Enum
Canary rollout state machine.
State transitions: - CREATED: initial, not yet started - CANARY: applied to a subset of clusters - PROMOTING: advancing to the next stage - PAUSED: temporarily halted (manual or automatic) - COMPLETED: fully rolled out to all clusters - ROLLED_BACK: rollback finished - FAILED: failure state - CANCELLED: cancelled by operator
PassCriteria
dataclass
PassCriteria(
error_rate_absolute_max: float = 0.05,
error_rate_increase_max: float = 0.01,
latency_p95_delta_ms: float = 50.0,
latency_p99_delta_pct: float = 0.2,
error_budget_drain_rate_max: float = 1.2,
error_budget_remaining_min: float = 0.1,
min_requests_required: int = 100,
evaluation_window_seconds: int = 300,
)
Promotion thresholds for canary stage evaluation.
Pure threshold DTO consumed by LiveCanaryEvaluator. The evaluator owns the comparison logic; this class holds the values only.
for_tier
classmethod
for_tier(tier_id: str) -> PassCriteria
Return tier-default PassCriteria for critical/standard/non_essential.
CanaryChaosGuard
CanaryChaosGuard(policy: ChaosConflictPolicy = None)
Defense against conflicts between Canary Rollout and Chaos experiments.
Applying a Canary rollout to a cluster where a chaos experiment is running makes it impossible to tell whether a metric change is caused by the configuration change or by the chaos. This guard detects the conflict and handles it according to the policy.
Default policy: SMART (exclude only the clusters under experiment)
Example
guard = CanaryChaosGuard(policy=ChaosConflictPolicy.SMART)
result = guard.check_conflict( target_clusters=["seoul", "tokyo", "singapore"], force_during_chaos=False, )
if result.has_conflict: logger.warning( "chaos_clusters_excluded", chaos_clusters=result.chaos_clusters, )
if not result.can_proceed: raise ValueError("All target clusters have active chaos")
Proceed with the rollout on result.safe_clusters
proceed_with_clusters(result.safe_clusters)
Initialize CanaryChaosGuard.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
ChaosConflictPolicy
|
Conflict policy (default: SMART) |
None
|
policy
property
writable
policy: ChaosConflictPolicy
Look up the current policy.
check_conflict
check_conflict(
target_clusters: list[str],
force_during_chaos: bool = False,
) -> ChaosConflictResult
Check for chaos experiment conflicts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_clusters
|
list[str]
|
Rollout target clusters |
required |
force_during_chaos
|
bool
|
Force-proceed flag (warning only if True) |
False
|
Returns:
| Type | Description |
|---|---|
ChaosConflictResult
|
Conflict check result (ChaosConflictResult) |
Note
Use force_during_chaos=True only in emergencies. Metrics-based automatic rollback may malfunction.
ChaosConflictPolicy
Bases: str, Enum
Chaos conflict policy.
How to handle the case where a chaos experiment is running on a target rollout cluster.
Values
STRICT: Fully block the Canary while a chaos experiment is running SMART: Proceed while excluding only the clusters under chaos (default) LOOSE: Proceed with all clusters after a warning (not recommended)
ChaosConflictResult
dataclass
ChaosConflictResult(
has_conflict: bool,
chaos_clusters: list[str],
safe_clusters: list[str],
policy_applied: ChaosConflictPolicy,
can_proceed: bool,
warning_message: str | None = None,
)
Conflict check result.
Attributes:
| Name | Type | Description |
|---|---|---|
has_conflict |
bool
|
Whether a conflict occurred |
chaos_clusters |
list[str]
|
List of clusters under a chaos experiment |
safe_clusters |
list[str]
|
List of clusters available for rollout |
policy_applied |
ChaosConflictPolicy
|
Applied policy |
can_proceed |
bool
|
Whether the rollout can proceed |
warning_message |
str | None
|
Warning message (on conflict) |
ClusterApplicator
ClusterApplicator(rollout_ttl_days: int = 7)
Applies canary config to individual clusters via CacheProviderInterface.
This handles LOCAL cluster cache key storage only. For cross-cluster synchronization, see cross_cluster.py.
Initialize the ClusterApplicator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_ttl_days
|
int
|
TTL for cluster config keys in days. |
7
|
cache
property
cache: CacheProviderInterface | None
CacheProviderInterface (lazy loading via ProviderRegistry).
apply_to_clusters
apply_to_clusters(
rollout: CanaryRollout, clusters: list[str]
) -> None
Apply config to multiple clusters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout
|
CanaryRollout
|
The canary rollout containing config to apply. |
required |
clusters
|
list[str]
|
List of cluster IDs to apply config to. |
required |
apply_config_to_cluster
apply_config_to_cluster(
cluster: str, config_type: str, values: dict[str, Any]
) -> None
Apply config to a specific cluster via cache provider.
The actual implementation stores config in a cluster-specific cache key. The sync mechanism depends on deployment: - Same Redis: namespace-based key storage - Separate Redis: per-cluster Redis connection - API-based: cluster API call
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cluster
|
str
|
Cluster ID. |
required |
config_type
|
str
|
Config type (circuit_breaker, dlq, retry, etc.). |
required |
values
|
dict[str, Any]
|
Config values to apply. |
required |
ClusterConfigChange
dataclass
ClusterConfigChange(
config_type: str,
previous_value: dict[str, Any],
new_value: dict[str, Any],
changed_by: str = "",
changed_at: datetime = utc_now(),
rollout_id: str | None = None,
reason: str = "",
)
Bases: SerializableMixin
Configuration change information.
Attributes:
| Name | Type | Description |
|---|---|---|
config_type |
str
|
config type (circuit_breaker, dlq, retry, etc.) |
previous_value |
dict[str, Any]
|
previous config value |
new_value |
dict[str, Any]
|
new config value |
changed_by |
str
|
who changed it |
changed_at |
datetime
|
change time |
rollout_id |
str | None
|
related Canary Rollout ID (if any) |
reason |
str
|
change reason |
from_dict
classmethod
from_dict(data: dict[str, Any]) -> ClusterConfigChange
Create from a dictionary.
CrossClusterNotifier
CrossClusterNotifier(
current_cluster: str | None = None,
other_clusters: list[str] | None = None,
notification_backend: NotificationBackend | None = None,
default_channel: str | None = None,
)
Cross-cluster notifier.
Sends a notification to the operators of other clusters on a config change. Does not apply automatically; only shares information.
Attributes:
| Name | Type | Description |
|---|---|---|
current_cluster |
current cluster ID |
|
other_clusters |
list of clusters to notify |
|
notification_backend |
notification delivery backend |
Example
notifier = CrossClusterNotifier( current_cluster="cluster-a", other_clusters=["cluster-b", "cluster-c"], )
notifier.notify_config_change(ClusterConfigChange( config_type="circuit_breaker", previous_value={"failure_threshold": 5}, new_value={"failure_threshold": 3}, changed_by="[email protected]", ))
Initialize CrossClusterNotifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_cluster
|
str | None
|
current cluster ID (defaults to the BALDUR_NAMESPACE env var) |
None
|
other_clusters
|
list[str] | None
|
list of clusters to notify |
None
|
notification_backend
|
NotificationBackend | None
|
notification delivery backend (default: logging) |
None
|
default_channel
|
str | None
|
default notification channel |
None
|
DEFAULT_CHANNEL
property
DEFAULT_CHANNEL: str
Get default channel from SlackChannelSettings.
notify_config_change
notify_config_change(
change: ClusterConfigChange,
result: str = "success",
channel: str | None = None,
) -> dict[str, bool]
Notify the operators of other clusters about a config change.
Does not apply automatically; only shares information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
change
|
ClusterConfigChange
|
config change information |
required |
result
|
str
|
rollout result ("success", "rolled_back", etc.) |
'success'
|
channel
|
str | None
|
notification channel (None to use the default) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dictionary of notification success per cluster |
add_cluster
add_cluster(cluster: str) -> None
Add a cluster to the notification targets.
remove_cluster
remove_cluster(cluster: str) -> None
Remove a cluster from the notification targets.
CrossClusterPropagationRequest
CrossClusterPropagationRequest(
store: CrossClusterStore | None = None,
notification_backend: NotificationBackend | None = None,
default_expiry_hours: int = 24,
on_apply: (
Callable[[PropagationRequest], bool] | None
) = None,
default_channel: str | None = None,
redis_client: Any = None,
)
Cross-cluster config propagation request (manual approval).
A safe approach where propagating a config to another cluster requires the approval of the target cluster's operator before it is applied.
Workflow: 1. Create a request (request_propagation) 2. Send an approval-request notification to the target cluster 3. The operator calls approve() or reject() 4. On approval, run the config-apply callback
Attributes:
| Name | Type | Description |
|---|---|---|
notification_backend |
notification delivery backend |
|
default_expiry_hours |
request expiration time (default 24 hours) |
|
on_apply |
config-apply callback function |
Example
requester = CrossClusterPropagationRequest()
Create a request
request_id = requester.request_propagation( source_cluster="cluster-a", target_clusters=["cluster-b"], change=config_change, )
(On the other cluster) approve
requester.approve(request_id, approved_by="[email protected]")
Initialize CrossClusterPropagationRequest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
CrossClusterStore | None
|
CrossClusterStore instance (auto-resolved if None) |
None
|
notification_backend
|
NotificationBackend | None
|
notification delivery backend |
None
|
default_expiry_hours
|
int
|
request expiration time |
24
|
on_apply
|
Callable[[PropagationRequest], bool] | None
|
config-apply callback function |
None
|
default_channel
|
str | None
|
default notification channel |
None
|
redis_client
|
Any
|
Deprecated — use store parameter instead |
None
|
store
property
store
CrossClusterStore (Lazy loading via ProviderRegistry).
request_propagation
request_propagation(
source_cluster: str,
target_clusters: list[str],
change: ClusterConfigChange,
expiry_hours: int | None = None,
) -> str
Create a config propagation request to other clusters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_cluster
|
str
|
the requesting source cluster |
required |
target_clusters
|
list[str]
|
list of target clusters |
required |
change
|
ClusterConfigChange
|
the config change to propagate |
required |
expiry_hours
|
int | None
|
request expiration time |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Request ID (applied only after the target cluster's operator approves) |
approve
approve(
request_id: str, approved_by: str
) -> tuple[bool, str | None]
Approve a config propagation request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request_id
|
str
|
request ID |
required |
approved_by
|
str
|
approver |
required |
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
(success, error message) |
reject
reject(
request_id: str, rejected_by: str, reason: str = ""
) -> tuple[bool, str | None]
Reject a config propagation request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request_id
|
str
|
request ID |
required |
rejected_by
|
str
|
rejecter |
required |
reason
|
str
|
rejection reason |
''
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
(success, error message) |
get_request
get_request(request_id: str) -> PropagationRequest | None
Query a request.
get_pending_requests
get_pending_requests(
cluster: str,
) -> list[PropagationRequest]
Query the list of pending requests for a cluster.
GovernancePolicy
dataclass
GovernancePolicy(
policy_id: str,
config_type: str,
rules: list[dict[str, Any]] = list(),
version: int = 1,
created_by: str = "",
created_at: datetime = utc_now(),
)
Bases: SerializableMixin
Governance policy.
Defines the allowed range of config values. This policy is synchronized automatically (it is not the actual config value).
Attributes:
| Name | Type | Description |
|---|---|---|
policy_id |
str
|
policy ID |
config_type |
str
|
config type |
rules |
list[dict[str, Any]]
|
list of policy rules e.g. {"max": 5, "min": 1, "field": "retry_max_attempts"} |
version |
int
|
policy version |
created_by |
str
|
creator |
created_at |
datetime
|
creation time |
validate_config
validate_config(
config: dict[str, Any],
) -> tuple[bool, list[str]]
Validate whether the config values comply with the policy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
config values to validate |
required |
Returns:
| Type | Description |
|---|---|
tuple[bool, list[str]]
|
(compliant, list of violation messages) |
GovernancePolicySync
GovernancePolicySync(
clusters: list[str] | None = None,
store: CrossClusterStore | None = None,
notification_backend: NotificationBackend | None = None,
redis_client: Any = None,
)
Governance policy synchronization (automatic).
Synchronizes only governance policies automatically. These are not the actual config values, but the allowed range of config values (upper/lower bounds).
Synchronized: - retry_max_attempts <= 5 (upper bound) - failure_threshold >= 2 (lower bound)
Not synchronized: - Actual config values (Blast Radius risk)
Attributes:
| Name | Type | Description |
|---|---|---|
clusters |
list of clusters to synchronize |
|
notification_backend |
notification delivery backend |
Example
policy_sync = GovernancePolicySync( clusters=["cluster-a", "cluster-b"], )
policy = GovernancePolicy( policy_id="circuit-breaker-limits", config_type="circuit_breaker", rules=[ {"field": "failure_threshold", "min": 2, "max": 10}, {"field": "reset_timeout_seconds", "max": 300}, ], )
policy_sync.sync_policy(policy)
Initialize GovernancePolicySync.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
clusters
|
list[str] | None
|
list of clusters to synchronize |
None
|
store
|
CrossClusterStore | None
|
CrossClusterStore instance (auto-resolved if None) |
None
|
notification_backend
|
NotificationBackend | None
|
notification delivery backend |
None
|
redis_client
|
Any
|
Deprecated — use store parameter instead |
None
|
store
property
store
CrossClusterStore (Lazy loading via ProviderRegistry).
sync_policy
sync_policy(policy: GovernancePolicy) -> dict[str, bool]
Synchronize a governance policy.
Synchronizes the policy to all target clusters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
GovernancePolicy
|
the governance policy to synchronize |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Synchronization success per cluster |
get_policy
get_policy(config_type: str) -> GovernancePolicy | None
Query a policy.
validate_config_against_policy
validate_config_against_policy(
config_type: str, config: dict[str, Any]
) -> tuple[bool, list[str]]
Validate whether the config values comply with the governance policy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
config type |
required |
config
|
dict[str, Any]
|
config values to validate |
required |
Returns:
| Type | Description |
|---|---|
tuple[bool, list[str]]
|
(compliant, list of violation messages) |
PropagationRequest
dataclass
PropagationRequest(
request_id: str,
source_cluster: str,
target_cluster: str,
config_change: ClusterConfigChange,
status: PropagationRequestStatus = PropagationRequestStatus.PENDING_APPROVAL,
created_at: datetime = utc_now(),
expires_at: datetime | None = None,
approved_by: str | None = None,
approved_at: datetime | None = None,
rejected_by: str | None = None,
rejected_at: datetime | None = None,
reject_reason: str | None = None,
)
Bases: SerializableMixin
Config propagation request.
The config is applied once approved by the other cluster.
from_dict
classmethod
from_dict(data: dict[str, Any]) -> PropagationRequest
Create from a dictionary.
PropagationRequestStatus
Bases: str, Enum
Config propagation request status.
CanaryEvaluator
Shadow Evaluation Gate and Live Canary Evaluation.
Handles pre-start shadow evaluation checks and pre-promote live canary metric evaluation.
check_shadow_evaluation
check_shadow_evaluation(
rollout: CanaryRollout,
bypass_shadow: bool,
bypass_shadow_reason: str,
) -> bool | None
Check Shadow Evaluation result before starting a rollout.
Returns:
| Name | Type | Description |
|---|---|---|
True |
bool | None
|
Passed (can start) |
False |
bool | None
|
Blocked (cannot start) |
None |
bool | None
|
Shadow evaluation not run or disabled (skip check) |
check_live_canary_evaluation
check_live_canary_evaluation(
rollout: CanaryRollout, tier_id: str | None = None
) -> bool | None
Evaluate live canary node metrics before promotion.
Returns:
| Name | Type | Description |
|---|---|---|
True |
bool | None
|
Passed (can promote) |
False |
bool | None
|
Blocked (cannot promote) |
None |
bool | None
|
Disabled or error (skip check) |
check_error_budget_drain
check_error_budget_drain(
rollout: CanaryRollout, tier_id: str | None = None
) -> bool | None
Check error-budget drain rate before promotion (PRO-only).
Blocks promotion when the canary is burning error budget faster than the
stage's error_budget_drain_rate_max (using the larger of the 1h/6h
burn rate to catch both fast and sustained burn) OR when the remaining
budget is below error_budget_remaining_min. The effective criteria are
the stage's pass_criteria with the tier floor applied, identical to
the live-evaluation path.
The drain check evaluates only when the Error Budget feature is enabled and wired with real stats; while it is OFF (the default) or unwired, the check honestly skips (returns None) rather than evaluating fake-healthy simulation data. When enabled, the per-stage PassCriteria values are the operator control (a stage can set a high drain limit to effectively opt out). PRO entitlement gating is inherited from module registration; no OSS import is introduced.
Returns:
| Name | Type | Description |
|---|---|---|
True |
bool | None
|
within drain limits (can promote) |
False |
bool | None
|
burning too fast or below the remaining floor (blocked) |
None |
bool | None
|
budget signal absent — feature OFF/unwired, or service unavailable/error (skip — fail-open) |
CanaryConfigMiddleware
CanaryConfigMiddleware(get_response: Callable)
Django middleware - Request-level Canary configuration application.
Performs the Canary decision at the start of the request and attaches the result to the request object.
Usage (settings.py): MIDDLEWARE = [ ... 'baldur_pro.services.canary.feature_flag.CanaryConfigMiddleware', ... ]
Access in views
def my_view(request): canary_decisions = getattr(request, 'canary_decisions', {}) if canary_decisions.get('circuit_breaker', {}).get('use_canary'): # Use the Canary configuration pass
Initialize the middleware.
feature_flag
property
feature_flag: CanaryFeatureFlag
CanaryFeatureFlag singleton.
CanaryDecision
dataclass
CanaryDecision(
use_canary: bool,
reason: str,
strategy_used: str,
effective_config: dict[str, Any] = dict(),
)
Result of the Canary application decision.
Attributes:
| Name | Type | Description |
|---|---|---|
use_canary |
bool
|
Whether to use the Canary configuration |
reason |
str
|
Reason for the decision |
strategy_used |
str
|
Selection strategy used |
effective_config |
dict[str, Any]
|
Configuration values to be applied finally |
CanaryFeatureFlag
CanaryFeatureFlag(
context_extractor: (
RequestContextExtractor | None
) = None,
)
Request-level Canary Feature Flag.
Decides whether to apply the Canary configuration per request within a single cluster.
Key features: - Probability-based Canary selection (consistency guaranteed) - Whitelist-based forced application - Header-based manual override - Independent Canary management per configuration
Example
feature_flag = CanaryFeatureFlag()
Register Canary configuration
feature_flag.register_flag(CanaryFlagConfig( config_type="circuit_breaker", percentage=10, # 10% of traffic baseline_config={"failure_threshold": 5}, canary_config={"failure_threshold": 3}, ))
Decide Canary application per request
decision = feature_flag.evaluate(request, "circuit_breaker")
if decision.use_canary: # Use the Canary configuration apply_config(decision.effective_config)
Initialize CanaryFeatureFlag.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context_extractor
|
RequestContextExtractor | None
|
Request context extractor |
None
|
register_flag
register_flag(config: CanaryFlagConfig) -> None
Register a Canary Flag.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
CanaryFlagConfig
|
Canary Flag configuration |
required |
unregister_flag
unregister_flag(config_type: str) -> bool
Unregister a Canary Flag.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether unregistration succeeded |
get_flag
get_flag(config_type: str) -> CanaryFlagConfig | None
Look up a Canary Flag.
list_flags
list_flags() -> list[CanaryFlagConfig]
Return a list of all Canary Flags.
should_use_canary_config
should_use_canary_config(
request: HttpRequest, config_type: str
) -> bool
Decide whether the Canary configuration should be applied to the request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
HttpRequest
|
Django HTTP request |
required |
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether to use the Canary configuration |
get_effective_config
get_effective_config(
request: HttpRequest,
config_type: str,
baseline_config: dict[str, Any] | None = None,
canary_config: dict[str, Any] | None = None,
) -> dict[str, Any]
Return the effective configuration to apply to the request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
HttpRequest
|
Django HTTP request |
required |
config_type
|
str
|
Configuration type |
required |
baseline_config
|
dict[str, Any] | None
|
Baseline configuration (used if absent from the Flag) |
None
|
canary_config
|
dict[str, Any] | None
|
Canary configuration (used if absent from the Flag) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Configuration values to apply |
evaluate
evaluate(
request: HttpRequest, config_type: str
) -> CanaryDecision
Evaluate whether to apply the Canary configuration.
Decides whether the request belongs to the Canary group based on the strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
HttpRequest
|
Django HTTP request |
required |
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
CanaryDecision
|
CanaryDecision result |
update_percentage
update_percentage(
config_type: str, new_percentage: float
) -> bool
Update the Canary application rate.
Used when gradually raising the rate while monitoring.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
new_percentage
|
float
|
New rate (0-100) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether the update succeeded |
CanaryFlagConfig
dataclass
CanaryFlagConfig(
config_type: str,
enabled: bool = True,
percentage: float = 10.0,
strategy: CanarySelectionStrategy = CanarySelectionStrategy.USER_ID_HASH,
rollout_id: str | None = None,
whitelist_user_ids: set[str] = set(),
whitelist_ips: set[str] = set(),
canary_header: str = "X-Canary-Config",
baseline_config: dict[str, Any] = dict(),
canary_config: dict[str, Any] = dict(),
created_at: datetime = utc_now(),
expires_at: datetime | None = None,
)
Bases: SerializableMixin
Canary Feature Flag configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
config_type |
str
|
Configuration type (circuit_breaker, dlq, retry, etc.) |
enabled |
bool
|
Whether enabled |
percentage |
float
|
Canary application rate (0-100) |
strategy |
CanarySelectionStrategy
|
Selection strategy |
rollout_id |
str | None
|
Related Canary Rollout ID |
whitelist_user_ids |
set[str]
|
List of whitelisted user IDs |
whitelist_ips |
set[str]
|
List of whitelisted IPs |
canary_header |
str
|
Header identifying a Canary request (for the HEADER_BASED strategy) |
baseline_config |
dict[str, Any]
|
Baseline configuration values |
canary_config |
dict[str, Any]
|
Canary configuration values |
created_at |
datetime
|
Creation time |
expires_at |
datetime | None
|
Expiration time (optional) |
is_expired
is_expired() -> bool
Check whether expired.
from_dict
classmethod
from_dict(data: dict[str, Any]) -> CanaryFlagConfig
Create from a dictionary.
CanarySelectionStrategy
Bases: str, Enum
Canary selection strategy.
How to decide whether a request receives the Canary configuration.
CanarySafetyInterlock
CanarySafetyInterlock(
policy: dict[str, InterlockAction] | None = None,
fail_closed: bool = True,
emergency_tracker_factory: Callable | None = None,
)
Safety Interlock for Canary rollouts.
Controls Canary operations based on the Emergency Level.
Features: - Emergency Level-based automatic braking - Configurable policy (per-level action mapping) - Fail-Closed policy (safe mode on backend failure)
Default policy: - NORMAL (0): ALLOW - LEVEL_1 (1): ALLOW_WITH_WARNING - LEVEL_2 (2): PAUSE - LEVEL_3 (3): ROLLBACK
Usage
interlock = CanarySafetyInterlock()
Check before the operation
result = interlock.check(operation="promote", rollout_id="abc123")
if not result.allowed: if result.action == InterlockAction.PAUSE: canary_service.pause(rollout_id) elif result.action == InterlockAction.ROLLBACK: canary_service.rollback(rollout_id)
Initialize CanarySafetyInterlock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
dict[str, InterlockAction] | None
|
Per-Emergency-Level action policy (uses DEFAULT_POLICY if None) |
None
|
fail_closed
|
bool
|
If True, treat a backend failure as LEVEL_3 (default: True) |
True
|
emergency_tracker_factory
|
Callable | None
|
EmergencyTracker factory function (for testing) |
None
|
check
check(
operation: str,
rollout_id: str | None = None,
namespace: str | None = None,
) -> InterlockResult
Safety Interlock check.
Looks up the current Emergency Level and decides the action per the policy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
str
|
Operation to perform (start, promote, resume, etc.) |
required |
rollout_id
|
str | None
|
Rollout ID (if any) |
None
|
namespace
|
str | None
|
Target namespace (current instance if None) |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
InterlockResult |
InterlockResult
|
check result |
check_and_apply
check_and_apply(
canary_service: CanaryRolloutService,
rollout_id: str,
operation: str,
namespace: str | None = None,
) -> InterlockResult
Safety Interlock check and automatic application.
For a PAUSE or ROLLBACK action, applies it automatically via canary_service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
canary_service
|
CanaryRolloutService
|
CanaryRolloutService instance |
required |
rollout_id
|
str
|
Rollout ID |
required |
operation
|
str
|
Operation to perform |
required |
namespace
|
str | None
|
Target namespace |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
InterlockResult |
InterlockResult
|
check and application result |
InterlockAction
Bases: str, Enum
Safety Interlock action.
The action type the Interlock decides for a Canary rollout operation.
ALLOW
class-attribute
instance-attribute
ALLOW = 'allow'
Allow normal progress.
ALLOW_WITH_WARNING
class-attribute
instance-attribute
ALLOW_WITH_WARNING = 'allow_with_warning'
Allow progress with a warning log. Used at LEVEL_1.
PAUSE
class-attribute
instance-attribute
PAUSE = 'pause'
Pause. Used at LEVEL_2. Can resume after the situation is resolved.
ROLLBACK
class-attribute
instance-attribute
ROLLBACK = 'rollback'
Immediate rollback. Used at LEVEL_3 or Fail-Closed.
BLOCK
class-attribute
instance-attribute
BLOCK = 'block'
Block the operation. Cannot start a new rollout.
InterlockCheckFailure
Bases: str, Enum
Interlock check failure type.
Records the failure cause when the Fail-Closed policy is applied.
BACKEND_UNAVAILABLE
class-attribute
instance-attribute
BACKEND_UNAVAILABLE = 'backend_unavailable'
Redis/backend connection unavailable.
TRACKER_ERROR
class-attribute
instance-attribute
TRACKER_ERROR = 'tracker_error'
EmergencyModeTracker error.
TIMEOUT
class-attribute
instance-attribute
TIMEOUT = 'timeout'
Check timeout.
InterlockResult
dataclass
InterlockResult(
action: InterlockAction,
allowed: bool,
reason: str,
emergency_level: str = "normal",
emergency_level_name: str = "normal",
namespace: str = "",
delay_seconds: int = 0,
auto_resume_at: str | None = None,
check_failure: InterlockCheckFailure | None = None,
is_fail_closed: bool = False,
metadata: dict[str, Any] = dict(),
)
Safety Interlock check result.
Holds the Interlock check result for a Canary operation.
Attributes:
| Name | Type | Description |
|---|---|---|
action |
InterlockAction
|
Action to perform (ALLOW, PAUSE, ROLLBACK, etc.) |
allowed |
bool
|
Whether to allow progress (True if action is in the ALLOW* family) |
reason |
str
|
Reason for the action decision |
emergency_level |
str
|
Current Emergency level |
namespace |
str
|
Applied namespace |
check_failure |
InterlockCheckFailure | None
|
Check failure type (None if normal) |
is_fail_closed |
bool
|
Whether the decision is due to the Fail-Closed policy |
action
instance-attribute
action: InterlockAction
Action to perform.
allowed
instance-attribute
allowed: bool
Whether to allow progress.
reason
instance-attribute
reason: str
Action reason.
emergency_level
class-attribute
instance-attribute
emergency_level: str = 'normal'
Current Emergency level value.
emergency_level_name
class-attribute
instance-attribute
emergency_level_name: str = 'normal'
Emergency level name.
namespace
class-attribute
instance-attribute
namespace: str = ''
Applied namespace.
delay_seconds
class-attribute
instance-attribute
delay_seconds: int = 0
Delay time (for PAUSE).
auto_resume_at
class-attribute
instance-attribute
auto_resume_at: str | None = None
Automatic resume time (if any).
check_failure
class-attribute
instance-attribute
check_failure: InterlockCheckFailure | None = None
Check failure type (None if normal).
is_fail_closed
class-attribute
instance-attribute
is_fail_closed: bool = False
Whether blocked due to the Fail-Closed policy.
metadata
class-attribute
instance-attribute
metadata: dict[str, Any] = field(default_factory=dict)
Additional metadata.
allow
classmethod
allow(
emergency_level: str,
emergency_level_name: str,
namespace: str,
) -> InterlockResult
Factory for an allow result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emergency_level
|
str
|
Emergency level value |
required |
emergency_level_name
|
str
|
Emergency level name |
required |
namespace
|
str
|
Namespace |
required |
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the ALLOW action |
allow_with_warning
classmethod
allow_with_warning(
emergency_level: str,
emergency_level_name: str,
namespace: str,
) -> InterlockResult
Factory for an allow-with-warning result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emergency_level
|
str
|
Emergency level value |
required |
emergency_level_name
|
str
|
Emergency level name |
required |
namespace
|
str
|
Namespace |
required |
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the ALLOW_WITH_WARNING action |
pause
classmethod
pause(
emergency_level: str,
emergency_level_name: str,
namespace: str,
reason: str | None = None,
) -> InterlockResult
Factory for a pause result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emergency_level
|
str
|
Emergency level value |
required |
emergency_level_name
|
str
|
Emergency level name |
required |
namespace
|
str
|
Namespace |
required |
reason
|
str | None
|
Additional reason (default message if absent) |
None
|
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the PAUSE action |
rollback
classmethod
rollback(
emergency_level: str,
emergency_level_name: str,
namespace: str,
reason: str | None = None,
) -> InterlockResult
Factory for a rollback result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emergency_level
|
str
|
Emergency level value |
required |
emergency_level_name
|
str
|
Emergency level name |
required |
namespace
|
str
|
Namespace |
required |
reason
|
str | None
|
Additional reason (default message if absent) |
None
|
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the ROLLBACK action |
fail_closed
classmethod
fail_closed(
failure: InterlockCheckFailure,
namespace: str,
error_message: str,
) -> InterlockResult
Factory for a Fail-Closed result.
On a backend failure, treats it as LEVEL_3 and returns the ROLLBACK action.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
failure
|
InterlockCheckFailure
|
Failure type |
required |
namespace
|
str
|
Namespace |
required |
error_message
|
str
|
Error message |
required |
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the ROLLBACK action due to Fail-Closed |
block
classmethod
block(
emergency_level: str,
emergency_level_name: str,
namespace: str,
reason: str | None = None,
) -> InterlockResult
Factory for a block result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emergency_level
|
str
|
Emergency level value |
required |
emergency_level_name
|
str
|
Emergency level name |
required |
namespace
|
str
|
Namespace |
required |
reason
|
str | None
|
Additional reason (default message if absent) |
None
|
Returns:
| Type | Description |
|---|---|
InterlockResult
|
InterlockResult with the BLOCK action |
CanaryConfigLock
CanaryConfigLock(
redis_client: Any, lock_timeout: timedelta | None = None
)
Config lock for canary rollouts.
Ensures only one rollout is active for a given config_type. Uses RedisDistributedLock for safe distributed operation.
Features
- Distributed lock (Redis SET NX PX)
- Rollout ID-based owner identification
- Auto-expiry (zombie lock prevention)
- Lock status and owner lookup
Example
lock = CanaryConfigLock(redis_client)
Acquire lock
if lock.acquire("circuit_breaker", "rollout-abc"): try: perform_rollout() finally: lock.release("circuit_breaker", "rollout-abc")
Initialize CanaryConfigLock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
redis_client
|
Any
|
Redis client instance |
required |
lock_timeout
|
timedelta | None
|
Lock auto-expiry time (default from settings) |
None
|
acquire
acquire(
config_type: str,
rollout_id: str,
blocking: bool = False,
) -> bool
Acquire the configuration lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type (circuit_breaker, dlq, retry, etc.) |
required |
rollout_id
|
str
|
Rollout ID (for owner identification) |
required |
blocking
|
bool
|
If True, wait until the lock is acquired (default False) |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether the lock was acquired |
Note
blocking=True is not recommended. A Canary rollout should fail immediately and notify the user.
release
release(config_type: str, rollout_id: str) -> bool
Release the configuration lock.
Releases after verifying the lock owner (handled atomically via a Lua script).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
rollout_id
|
str
|
Rollout ID (for owner verification) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether the lock was released |
is_locked
is_locked(config_type: str) -> bool
Check the lock state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the lock is held |
get_lock_owner
get_lock_owner(config_type: str) -> str | None
Look up the current lock owner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Lock owner (rollout ID) or None |
extend
extend(
config_type: str,
rollout_id: str,
additional_time: timedelta = None,
) -> bool
Extend the lock TTL.
Prevents lock expiry when a rollout takes longer than expected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
rollout_id
|
str
|
Rollout ID (for owner verification) |
required |
additional_time
|
timedelta
|
Time to extend (default: same as the initial timeout) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether the extension succeeded |
force_release
force_release(config_type: str) -> bool
Force-release the lock (admin only).
For cleaning up zombie rollouts. Deletes the lock without owner verification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether the deletion succeeded |
Warning
This method should be used only with admin privileges. Use release() in normal situations.
ConfigLockError
ConfigLockError(
message: str,
config_type: str = "",
current_owner: str | None = None,
)
Bases: BaldurError
Config lock acquisition failed.
Raised when a rollout is already in progress for the same config_type.
Attributes:
| Name | Type | Description |
|---|---|---|
config_type |
Configuration type |
|
current_owner |
Current lock owner (rollout ID) |
CanaryMetricsCollector
Collects canary rollout metrics via LiveCanaryEvaluator.
Queries real-time metrics and converts them to CanaryMetrics format.
collect
collect(rollout: CanaryRollout) -> list[CanaryMetrics]
Collect metrics for a canary rollout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout
|
CanaryRollout
|
The canary rollout to collect metrics for. |
required |
Returns:
| Type | Description |
|---|---|
list[CanaryMetrics]
|
List of CanaryMetrics for the current stage. |
CanaryMetrics
dataclass
CanaryMetrics(
cluster: str,
stage_name: str,
error_rate_before: float = 0.0,
error_rate_after: float = 0.0,
latency_p50_before: float = 0.0,
latency_p50_after: float = 0.0,
latency_p99_before: float = 0.0,
latency_p99_after: float = 0.0,
requests_total: int = 0,
errors_total: int = 0,
is_healthy: bool = True,
unhealthy_reason: str | None = None,
)
Metrics for a Canary stage.
Determines the health status by comparing metrics before and after the configuration change.
CanaryRollout
dataclass
CanaryRollout(
id: str,
config_type: str,
previous_values: dict[str, Any],
new_values: dict[str, Any],
state: CanaryState = CanaryState.CREATED,
current_stage_index: int = 0,
stages: list[CanaryStage] = list(),
created_by: str = "",
created_at: datetime = utc_now(),
reason: str = "",
completed_at: datetime | None = None,
rollback_reason: str | None = None,
pause_reason: str | None = None,
pause_triggered_by: str | None = None,
paused_at: datetime | None = None,
stage_started_at: datetime | None = None,
version: int = 0,
)
Canary rollout information.
The full rollout plan and state for a single configuration change.
Example
rollout = CanaryRollout( id="abc12345", config_type="circuit_breaker", previous_values={"failure_threshold": 5}, new_values={"failure_threshold": 3}, stages=[ CanaryStage(name="canary", clusters=["seoul-canary"], percentage=10.0), CanaryStage(name="50%", clusters=["seoul-main"], percentage=50.0), CanaryStage(name="full", clusters=["tokyo", "singapore"], percentage=100.0), ], created_by="[email protected]", reason="Reduce failure threshold for faster detection", )
pause_reason
class-attribute
instance-attribute
pause_reason: str | None = None
Pause reason (e.g., 'Error budget below threshold (5.0% < 10.0%)').
pause_triggered_by
class-attribute
instance-attribute
pause_triggered_by: str | None = None
Pause trigger type.
Values: - "interlock": Automatically paused by the Safety Interlock (the provenance of every emergency-driven pause) - "manual": Manually paused by an operator - "chaos_guard": Paused by the Chaos Guard - "metrics": Paused due to metric degradation - "error_budget": Paused due to error budget exhaustion - "governance": Paused due to a governance check failure - "recovery_compensation": Re-paused while compensating a failed recovery
paused_at
class-attribute
instance-attribute
paused_at: datetime | None = None
Pause time.
stage_started_at
class-attribute
instance-attribute
stage_started_at: datetime | None = None
When the current stage's configuration was applied (None before start).
Set on start, on each promote to a next stage, and reset on resume so the
stage's observation window is measured from stage entry, not rollout
creation. None for rollouts persisted before this field existed —
consumers fall back to created_at.
version
class-attribute
instance-attribute
version: int = 0
Optimistic locking version, incremented on each save.
current_stage
property
current_stage: CanaryStage | None
Return the current stage.
affected_clusters
property
affected_clusters: list[str]
List of clusters applied so far.
is_terminal
property
is_terminal: bool
Check whether it is a terminal state.
progress_percentage
property
progress_percentage: float
Rollout progress (0.0 ~ 100.0).
RolloutSerializer
Serializes and deserializes CanaryRollout objects.
Ensures backward compatibility when fields are added.
serialize
serialize(rollout: CanaryRollout) -> dict[str, Any]
Serialize a CanaryRollout to a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout
|
CanaryRollout
|
The rollout to serialize. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation suitable for JSON encoding. |
deserialize
deserialize(data: dict[str, Any]) -> CanaryRollout
Deserialize a dictionary to a CanaryRollout (backward compatible).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary representation of a rollout. |
required |
Returns:
| Type | Description |
|---|---|
CanaryRollout
|
Restored CanaryRollout instance. |
CanaryRolloutService
CanaryRolloutService(
store: CanaryRolloutStore | None = None,
)
Bases: EventEmitterMixin
Canary Rollout management service (thin orchestrator).
Delegates to focused sub-modules for specific responsibilities: - RolloutStateMachine: state transition validation - ClusterApplicator: per-cluster config application - CanaryEvaluator: shadow & live evaluation - RolloutSerializer: serialization/deserialization - CanaryMetricsCollector: metrics collection
Example
service = get_canary_rollout_service()
Create rollout
rollout = service.create_rollout( config_type="circuit_breaker", new_values={"failure_threshold": 3}, stages=[...], created_by="[email protected]", )
Start -> Promote -> Complete
service.start_rollout(rollout.id) service.promote(rollout.id) # next stage service.promote(rollout.id) # complete
Initialize CanaryRolloutService.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
CanaryRolloutStore | None
|
Optional CanaryRolloutStore injection (auto-resolved if None) |
None
|
store
property
store: CanaryRolloutStore | None
CanaryRolloutStore (lazy loading via ProviderRegistry).
rollout_ttl_days
property
rollout_ttl_days: int
Rollout data retention period (loaded from Settings).
config_history
property
config_history
ConfigHistoryService (lazy loading with graceful fallback).
chaos_guard
property
chaos_guard: CanaryChaosGuard
CanaryChaosGuard (lazy loading).
cluster_applicator
property
cluster_applicator: ClusterApplicator
ClusterApplicator (lazy loading).
create_rollout
create_rollout(
config_type: str,
new_values: dict[str, Any],
stages: list[CanaryStage],
created_by: str,
reason: str = "",
force_during_chaos: bool = False,
) -> CanaryRollout
Create a new Canary rollout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Config type (circuit_breaker, dlq, retry, etc.) |
required |
new_values
|
dict[str, Any]
|
New config values |
required |
stages
|
list[CanaryStage]
|
Rollout stage definitions |
required |
created_by
|
str
|
Creator (email or username) |
required |
reason
|
str
|
Change reason |
''
|
force_during_chaos
|
bool
|
Force proceed during chaos experiments |
False
|
Returns:
| Type | Description |
|---|---|
CanaryRollout
|
Created CanaryRollout |
Raises:
| Type | Description |
|---|---|
ConfigLockError
|
When a rollout is already in progress for this config_type |
ValueError
|
When stages is empty |
start_rollout
start_rollout(
rollout_id: str,
force_during_chaos: bool = False,
bypass_shadow: bool = False,
bypass_shadow_reason: str = "",
bypass_governance: bool = False,
bypass_reason: str = "",
requested_by: str = "",
) -> bool
Start a Canary rollout (apply first stage).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
force_during_chaos
|
bool
|
Force proceed during chaos experiments |
False
|
bypass_shadow
|
bool
|
Whether to bypass shadow evaluation failure |
False
|
bypass_shadow_reason
|
str
|
Reason for bypass (min length: settings.bypass_min_reason_length) |
''
|
bypass_governance
|
bool
|
Skip governance validation (audit required) |
False
|
bypass_reason
|
str
|
Reason for bypass (required when bypass_governance=True, min length from settings) |
''
|
requested_by
|
str
|
Requester (for audit logging) |
''
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
promote
promote(
rollout_id: str,
force: bool = False,
bypass_governance: bool = False,
bypass_reason: str = "",
requested_by: str = "",
tier_id: str | None = None,
) -> bool
Promote to next stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
force
|
bool
|
Skip metric validation (legacy) |
False
|
bypass_governance
|
bool
|
Skip governance validation (audit required) |
False
|
bypass_reason
|
str
|
Reason for bypass (required when bypass_governance=True, min length from settings) |
''
|
requested_by
|
str
|
Requester (for audit logging) |
''
|
tier_id
|
str | None
|
Tier ID for apply_tier_floor (None = not applied) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
rollback
rollback(
rollout_id: str,
reason: str = "",
bypass_governance: bool = False,
bypass_reason: str = "",
requested_by: str = "",
) -> bool
Perform rollback.
Restores all applied clusters to previous config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
reason
|
str
|
Rollback reason |
''
|
bypass_governance
|
bool
|
Skip governance validation (audit required) |
False
|
bypass_reason
|
str
|
Reason for bypass (required when bypass_governance=True, min length from settings) |
''
|
requested_by
|
str
|
Requester (for audit logging) |
''
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
pause
pause(
rollout_id: str,
reason: str = "",
triggered_by: str = "manual",
) -> bool
Pause rollout (with reason tracking).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
reason
|
str
|
Pause reason |
''
|
triggered_by
|
str
|
Trigger type (manual, interlock, chaos_guard, metrics, error_budget, governance, recovery_compensation) |
'manual'
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
resume
resume(
rollout_id: str,
*,
bypass_governance: bool = False,
bypass_reason: str = "",
requested_by: str = ""
) -> bool
Resume a paused rollout.
Performs an emergency-only governance re-check (no kill switch, no error budget). If Emergency Mode is at or above the configured min level, resume is blocked — the rollout stays PAUSED.
The auto-recovery CANARY_RESUME step uses bypass_governance=True so a
recovery can resume canaries while the emergency it is recovering from is
still elevated; the gate stays fail-closed for operator/REST resumes.
Mirrors rollback()'s bypass signature.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
bypass_governance
|
bool
|
Skip the emergency governance re-check (audit required) |
False
|
bypass_reason
|
str
|
Reason for bypass (required when bypass_governance=True, min length from settings) |
''
|
requested_by
|
str
|
Requester (for audit logging) |
''
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
get_paused_rollouts
get_paused_rollouts(
triggered_by_whitelist: list[str] | None = None,
) -> list[CanaryRollout]
Get PAUSED rollouts, optionally filtered by pause trigger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
triggered_by_whitelist
|
list[str] | None
|
Allowed trigger types to include. - None: return all PAUSED rollouts - ["error_budget", "governance"]: only those triggers |
None
|
Returns:
| Type | Description |
|---|---|
list[CanaryRollout]
|
List of PAUSED CanaryRollout instances matching the filter. |
cancel
cancel(rollout_id: str, reason: str = '') -> bool
Cancel rollout (only in pre-start state).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
reason
|
str
|
Cancel reason |
''
|
Returns:
| Type | Description |
|---|---|
bool
|
Success status |
renew_config_lock
renew_config_lock(
rollout: CanaryRollout,
) -> LockRenewalOutcome
Renew a live rollout's config-type lock (watchdog-hosted).
Extends the lock TTL so a rollout that stays active past
lock_timeout_minutes keeps the single-active-rollout invariant,
while the crash-freeze valve is preserved: only rollouts the watchdog
visits are renewed, so a stopped/disabled watchdog lane lets locks
expire at TTL.
On extend failure the lock state is diagnosed via an owner lookup and recovered where safe — a lapsed lock under a still-live, started rollout is re-acquired, bounding the duplicate-create window after a watchdog/Redis outage to at most one scan cadence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout
|
CanaryRollout
|
The active rollout whose lock should be renewed. |
required |
Returns:
| Type | Description |
|---|---|
LockRenewalOutcome
|
LockRenewalOutcome describing what happened. |
get_rollout
get_rollout(rollout_id: str) -> CanaryRollout | None
Get a rollout by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
Returns:
| Type | Description |
|---|---|
CanaryRollout | None
|
CanaryRollout or None |
get_active_rollouts
get_active_rollouts() -> list[CanaryRollout]
Get list of active rollouts.
Returns:
| Type | Description |
|---|---|
list[CanaryRollout]
|
List of active CanaryRollout instances |
get_rollout_for_config
get_rollout_for_config(
config_type: str,
) -> CanaryRollout | None
Get active rollout for a specific config_type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Config type |
required |
Returns:
| Type | Description |
|---|---|
CanaryRollout | None
|
Active rollout or None |
get_completed_rollouts
get_completed_rollouts(
limit: int = 20,
) -> list[CanaryRollout]
Get completed rollouts (completed/rolled_back/failed/cancelled).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum count (default 20) |
20
|
Returns:
| Type | Description |
|---|---|
list[CanaryRollout]
|
List of completed CanaryRollout instances |
collect_metrics
collect_metrics(rollout_id: str) -> list[CanaryMetrics]
Collect rollout metrics (Public API).
Delegates to CanaryMetricsCollector for real-time metric collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
Returns:
| Type | Description |
|---|---|
list[CanaryMetrics]
|
List of CanaryMetrics |
RolloutStateMachine
Canary Rollout state transition rules and validation.
Encapsulates all valid state transitions and provides a single point of truth for lifecycle validation.
can_transition
can_transition(
current: CanaryState, target: CanaryState
) -> bool
Check if a state transition is valid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current
|
CanaryState
|
Current state. |
required |
target
|
CanaryState
|
Desired target state. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the transition is allowed. |
transition
transition(
rollout: CanaryRollout, target: CanaryState
) -> bool
Validate and execute a state transition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout
|
CanaryRollout
|
The rollout to transition. |
required |
target
|
CanaryState
|
Desired target state. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the transition was executed successfully. |
is_terminal
is_terminal(state: CanaryState) -> bool
Check if a state is terminal (no outgoing transitions).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
CanaryState
|
The state to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the state is terminal. |
VersionChecker
Configuration version checker.
Detects concurrent modification conflicts with the Optimistic Locking pattern.
Example
checker = VersionChecker()
Check the version
is_valid, info = checker.check( config_type="circuit_breaker", expected_version=8, )
if not is_valid: raise VersionConflictError( expected_version=8, actual_version=info["actual_version"], conflicting_operator=info["changed_by"], config_type="circuit_breaker", )
check
check(
config_type: str, expected_version: int
) -> tuple[bool, dict]
Check whether the version matches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
expected_version
|
int
|
Expected current version |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool
|
(whether it matches, details dict) |
|
Details |
dict
|
{"actual_version": int, "changed_by": str} |
get_current_version
get_current_version(config_type: str) -> int
Look up the current version number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type |
required |
Returns:
| Type | Description |
|---|---|
int
|
Current version number (0 if absent) |
VersionConflictError
VersionConflictError(
expected_version: int,
actual_version: int,
conflicting_operator: str,
config_type: str,
)
Bases: BaldurError
Config version conflict.
Raised when optimistic locking detects version mismatch.
Attributes:
| Name | Type | Description |
|---|---|---|
expected_version |
Expected current version |
|
actual_version |
Actual current version |
|
conflicting_operator |
Operator who made the conflicting change |
|
config_type |
Configuration type |
log_canary_action
log_canary_action(
action: str,
rollout: CanaryRollout,
safety_check_result: dict[str, Any] | None = None,
additional_context: dict[str, Any] | None = None,
) -> None
Audit log for Canary Rollout actions.
Records all Canary actions for forensics and compliance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
str
|
Action type (start, promote, rollback, pause, resume, etc.) |
required |
rollout
|
CanaryRollout
|
Rollout information |
required |
safety_check_result
|
dict[str, Any] | None
|
Pre-check results (chaos conflict, version check, etc.) |
None
|
additional_context
|
dict[str, Any] | None
|
Additional context (failure reason, metrics, etc.) |
None
|
Example
log_canary_action( action="start", rollout=rollout, safety_check_result={ "chaos_guard": "passed", "config_lock": "acquired", }, additional_context={ "triggerd_by": "api", }, )
log_canary_error
log_canary_error(
action: str,
rollout_id: str,
config_type: str,
error: Exception,
operator: str,
) -> None
Log for Canary operation failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
str
|
Attempted action |
required |
rollout_id
|
str
|
Rollout ID |
required |
config_type
|
str
|
Configuration type |
required |
error
|
Exception
|
Raised exception |
required |
operator
|
str
|
Operator |
required |
log_canary_metrics_check
log_canary_metrics_check(
rollout_id: str,
stage_name: str,
metrics: dict[str, Any],
passed: bool,
failure_reason: str | None = None,
) -> None
Log for metrics check results.
Records the metrics check results before promotion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rollout_id
|
str
|
Rollout ID |
required |
stage_name
|
str
|
Stage name |
required |
metrics
|
dict[str, Any]
|
Collected metrics |
required |
passed
|
bool
|
Whether the check passed |
required |
failure_reason
|
str | None
|
Failure reason (when the check fails) |
None
|
get_cross_cluster_notifier
get_cross_cluster_notifier() -> CrossClusterNotifier
Return the CrossClusterNotifier singleton instance.
get_governance_policy_sync
get_governance_policy_sync() -> GovernancePolicySync
Return the GovernancePolicySync singleton instance.
get_propagation_request_service
get_propagation_request_service() -> (
CrossClusterPropagationRequest
)
Return the CrossClusterPropagationRequest singleton instance.
reset_cross_cluster_services
reset_cross_cluster_services() -> None
Reset the singleton instances (for tests).
get_canary_feature_flag
get_canary_feature_flag() -> CanaryFeatureFlag
Return the CanaryFeatureFlag singleton instance.
reset_canary_feature_flag
reset_canary_feature_flag() -> None
Reset the singleton instance (for testing).
get_canary_safety_interlock
get_canary_safety_interlock() -> CanarySafetyInterlock
Return the CanarySafetyInterlock singleton.
Returns:
| Type | Description |
|---|---|
CanarySafetyInterlock
|
CanarySafetyInterlock instance |
reset_canary_safety_interlock
reset_canary_safety_interlock() -> None
Reset the singleton (for testing).
apply_tier_floor
apply_tier_floor(
user_criteria: PassCriteria, tier_id: str
) -> PassCriteria
Apply the stricter of the user criterion and the tier floor.
"max" fields: min(user, tier) → a smaller value is stricter "min" fields: max(user, tier) → a larger value is stricter
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_criteria
|
PassCriteria
|
PassCriteria specified by the user |
required |
tier_id
|
str
|
"critical" | "standard" | "non_essential" |
required |
Returns:
| Type | Description |
|---|---|
PassCriteria
|
A new PassCriteria with the tier floor applied |
get_canary_rollout_service
get_canary_rollout_service() -> CanaryRolloutService
Return the CanaryRolloutService singleton.
Returns:
| Type | Description |
|---|---|
CanaryRolloutService
|
CanaryRolloutService instance |
reset_canary_rollout_service
reset_canary_rollout_service() -> None
Reset the singleton (for testing).
Warning
Do not use in production.
is_canary_operation
is_canary_operation() -> bool
Check if the current execution context is a canary rollout operation.
Uses domain_tag ContextVar to detect canary context.
Returns:
| Type | Description |
|---|---|
bool
|
True if current context is tagged as canary operation |
check_version_and_rollback
check_version_and_rollback(
config_type: str,
target_version: int,
expected_current_version: int,
rolled_back_by: str,
) -> ConfigVersion
Perform a rollback after checking the version.
Detects a conflict with Optimistic Locking and performs the rollback if there is no conflict. On a conflict, raises VersionConflictError and writes an Audit log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_type
|
str
|
Configuration type (circuit_breaker, dlq, etc.) |
required |
target_version
|
int
|
Target version to roll back to |
required |
expected_current_version
|
int
|
Expected current version (optimistic lock) |
required |
rolled_back_by
|
str
|
Operator performing the rollback (email or username) |
required |
Returns:
| Type | Description |
|---|---|
ConfigVersion
|
Newly created rollback version (ConfigVersion) |
Raises:
| Type | Description |
|---|---|
VersionConflictError
|
On a version conflict |
ValueError
|
When the target version does not exist |
Example
try: new_version = check_version_and_rollback( config_type="circuit_breaker", target_version=5, expected_current_version=8, rolled_back_by="[email protected]", ) print(f"Rolled back to v{target_version}, new version: v{new_version.version}") except VersionConflictError as e: print(f"Conflict: {e.conflicting_operator} modified while you were working")