Skip to content

baldur.services — Service Access

The re-export facade for service getters and the core service classes. Resolve the circuit-breaker and replay services here; the SLA-threshold helper exposes the configured breach thresholds.

Service getters

get_circuit_breaker_service

get_circuit_breaker_service() -> CircuitBreakerService

Return the runtime-scoped CircuitBreakerService singleton.

Delegates to the active :class:~baldur.runtime.BaldurRuntime — test isolation, copy_context() scoping, and runtime swap-in work transparently through the runtime's singleton store.

Explicit-def wrapper (instead of tuple-unpacking the make_singleton_factory return value directly) so the symbol is statically discoverable by docs tooling (mkdocstrings/griffe) and carries a Public-surface docstring per the reference page contract.

get_replay_service

get_replay_service() -> ReplayService

Get the singleton replay service instance.

get_sla_thresholds

get_sla_thresholds() -> SLASettings

Get SLA thresholds configuration.

Core service classes

CircuitBreakerService

CircuitBreakerService(
    config: CircuitBreakerConfig | None = None,
    repository: CircuitBreakerStateRepository | None = None,
)

Bases: EventEmitterMixin, ProtectionMixin, ManualControlMixin

Circuit Breaker Service.

Provides management operations for circuit breaker states. Designed for manual (toggle-based) control by operators.

Usage

service = CircuitBreakerService()

Force open (block requests)

result = service.force_open( service_name="external_api", reason="External service maintenance", controlled_by=admin_user )

Force close (allow requests)

result = service.force_close( service_name="external_api", reason="Service recovered", controlled_by=admin_user, trigger_replay=True )

Check if requests should be allowed

if service.should_allow("external_api"): # proceed with request

For testing with mock repository

mock_repo = Mock(spec=CircuitBreakerStateRepository) service = CircuitBreakerService(repository=mock_repo)

Initialize the circuit breaker service.

Parameters:

Name Type Description Default
config CircuitBreakerConfig | None

Optional configuration, loads from settings if None

None
repository CircuitBreakerStateRepository | None

Optional repository for DI, uses Django adapter if None

None

repository property

repository: CircuitBreakerStateRepository

Get the repository using ProviderRegistry with fallback policy.

is_enabled property

is_enabled: bool

Check if circuit breaker is enabled.

register_downstream_checker

register_downstream_checker(
    checker: Callable[[str], bool],
) -> None

Register a should_allow() pre-check hook.

checker(service_name) → False triggers a preemptive Fallback. checker MUST only perform local in-memory lookups (no external I/O).

apply_threshold_override

apply_threshold_override(
    service_name: str, override: Any
) -> None

Apply a threshold override set by the mesh coordinator.

While the override is active, the service's failure_threshold and recovery_timeout use the override values.

remove_threshold_override

remove_threshold_override(service_name: str) -> None

Remove the threshold override and revert to the original config.

get_effective_config

get_effective_config(
    service_name: str,
) -> CircuitBreakerConfig

Return the effective config with overrides applied.

Lookup happens in the L1 local cache, so there is no external I/O. Without an override, returns the base config; otherwise replaces only the overridden fields.

get_or_create_state

get_or_create_state(
    service_name: str,
) -> CircuitBreakerStateData

Get or create a circuit breaker state for a service.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required

Returns:

Type Description
CircuitBreakerStateData

CircuitBreakerStateData instance

get_state

get_state(service_name: str) -> str

Get the current state of a circuit breaker.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required

Returns:

Type Description
str

Current state (closed, open, half_open)

should_allow

should_allow(service_name: str) -> bool

Check if requests should be allowed through the circuit breaker.

Post-476: HALF_OPEN slot acquisition is delegated to the repository's atomic try_acquire_half_open_slot so the per-service counter is cluster-wide accurate (Redis Lua) instead of per-process best-effort.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required

Returns:

Type Description
bool

True if requests should be allowed, False if blocked

should_allow_with_state

should_allow_with_state(
    service_name: str,
) -> CircuitBreakerDecision

Companion API to should_allow that returns the admission decision and the resolved state in a single call.

Closes the redundant get_or_create_state lookup that CircuitBreakerPolicy.execute() previously incurred on the reject path: the policy can now read decision.allowed for branching and decision.state.state for the rejection metadata without a second repository RLock acquire.

For is_enabled=False callers we return CircuitBreakerDecision with allowed=True and a freshly-fetched state — direct callers of the companion API contract receive a non-None state regardless of feature-flag posture. CircuitBreakerPolicy short-circuits on is_enabled before invoking this method, so the disabled-CB fetch is off the hot path.

should_allow_with_fallback

should_allow_with_fallback(
    service_name: str,
    cache_key: str | None = None,
    default_response: Any | None = None,
    request_data: dict[str, Any] | None = None,
) -> CircuitBreakerFallbackResult

Check if requests should be allowed with fallback strategy support.

.. deprecated:: This method is deprecated. Use a CircuitBreakerPolicy + FallbackPolicy combination instead.

When CB is open, instead of simply blocking, this method can: 1. Return cached (stale) data 2. Queue the request to DLQ for later retry 3. Return a default/static response

Parameters:

Name Type Description Default
service_name str

Name of the external service

required
cache_key str | None

Optional Redis key for cached data lookup

None
default_response Any | None

Optional default response to return

None
request_data dict[str, Any] | None

Optional request data for DLQ queueing

None

Returns:

Type Description
CircuitBreakerFallbackResult

CircuitBreakerFallbackResult with decision and optional fallback data

get_total_calls

get_total_calls(service_name: str) -> int

Get total call count for a service (success + failure).

Used for minimum_calls check to prevent false positives.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required

Returns:

Type Description
int

Total number of calls tracked

get_all_states

get_all_states() -> list[dict[str, Any]]

Get all circuit breaker states.

Returns:

Type Description
list[dict[str, Any]]

List of state dictionaries

get_open_states

get_open_states(
    limit: int | None = None,
) -> list[CircuitBreakerStateData]

Get circuit breaker states currently in OPEN state.

More efficient than get_all_states() for watchdog recovery which only needs OPEN states. Delegates to repository.get_open_states() which uses SCAN instead of KEYS in Redis.

Parameters:

Name Type Description Default
limit int | None

Maximum number of results. None means no limit.

None

Returns:

Type Description
list[CircuitBreakerStateData]

List of CircuitBreakerStateData with state == OPEN,

list[CircuitBreakerStateData]

ordered by opened_at ascending (oldest first).

get_aggregate_failure_rate

get_aggregate_failure_rate() -> float

Return the system-wide circuit-breaker failure fraction (0.0-1.0).

Aggregates over every tracked circuit-breaker state as sum(failure_count) / sum(failure_count + success_count), returning 0.0 when no calls have been recorded across any circuit.

This is a system-wide mean error fraction, not a fixed-time-window or per-service rate: a single failing service among many healthy ones is averaged below threshold, while a broad multi-service failure raises the mean. That makes it suited to a system-wide stability gate, where each individual service is still protected by its own circuit breaker.

Returns:

Type Description
float

Failure fraction in the range 0.0-1.0.

record_failure

record_failure(
    service_name: str,
    error_context: dict[str, Any] | None = None,
    hint_state: CircuitBreakerStateData | None = None,
) -> None

Record a failure for a service.

This is used for automatic circuit breaker mode. If the threshold is exceeded AND minimum_calls is met, the circuit opens automatically.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required
error_context dict[str, Any] | None

Optional context about the failure (for snapshot)

None
hint_state CircuitBreakerStateData | None

Optional pre-fetched state — when the caller already loaded the state via should_allow_with_state, passing it here skips a redundant repository lookup. The hint is used only when its service_name matches; otherwise we fall through to a fresh fetch. Failures always increment counters, so the hint cannot fully skip the repository write.

None

record_success

record_success(
    service_name: str,
    hint_state: CircuitBreakerStateData | None = None,
) -> None

Record a success for a service.

This is used for automatic circuit breaker mode. In half-open state, enough successes will close the circuit.

Parameters:

Name Type Description Default
service_name str

Name of the external service

required
hint_state CircuitBreakerStateData | None

Optional pre-fetched state — when the caller already loaded the state via should_allow_with_state, passing it here unlocks two optimizations: 1. Fast path: when the hint indicates steady-state CLOSED (manually_controlled=False, failure_count=0), the call returns immediately without touching the repository — the eventual update_state(failure_count=0) is a no-op. 2. Slow path: skips the redundant get_or_create_state repository read when the hint's service_name matches. Stale-hint cases are observably equivalent to the no-hint path: a missed reset is corrected by the next call's slow path.

None

check_recovery_transitions

check_recovery_transitions() -> dict

Check for circuit breakers that should transition from OPEN to HALF_OPEN.

This method should be called periodically (e.g., every minute) to check if any OPEN circuits have exceeded the recovery timeout and should transition to HALF_OPEN for testing.

Returns:

Type Description
dict

Dictionary with transitioned service names and count

manual_control

manual_control(
    service_name: str,
    action: str,
    reason: str = "",
    controlled_by: Any = None,
) -> CircuitBreakerResult

Manually control a circuit breaker state.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
action str

'open', 'close', or 'auto'

required
reason str

Reason for the control action

''
controlled_by Any

User who initiated the action

None

Returns:

Type Description
CircuitBreakerResult

CircuitBreakerResult with operation details

reconcile_cb_cell_mapping

reconcile_cb_cell_mapping() -> dict[str, Any]

Reconcile CB-to-Cell mapping consistency after a Ring Resize.

  1. Iterate over all CBs and extract cell_id from the Composite Key
  2. Compare against the correct cell_id per the current Hash Ring
  3. On mismatch: archive + delete the orphan CB (no state transition)
  4. CBs for new Cells are lazily created by get_or_create()

Returns:

Type Description
dict[str, Any]

{"archived": [...], "errors": [...]}

ReplayService

ReplayService(
    repository: FailedOperationRepository | None = None,
    cache: CacheProviderInterface | None = None,
)

Bases: EventEmitterMixin

DLQ Replay Service.

Orchestrates replay operations for failed operations.

Usage

service = ReplayService()

Single replay

result = service.replay_single(dlq_id=123)

Batch replay

batch_result = service.replay_batch( failure_type="PG_TIMEOUT", max_items=50 )

For testing with mock repository

mock_repo = Mock(spec=FailedOperationRepository) service = ReplayService(repository=mock_repo)

Initialize the replay service.

Parameters:

Name Type Description Default
repository FailedOperationRepository | None

Optional repository for DI, uses Django adapter if None

None
cache CacheProviderInterface | None

Optional cache provider for the per-service inflight lock guarding replay_on_circuit_close. If omitted, the provider is lazy-resolved via ProviderRegistry on first use. If resolution fails or the resolved provider does not support get_lock(), the guard fails open with a WARNING log.

None

repository property

repository: FailedOperationRepository

Get the repository using ProviderRegistry with fallback policy.

cache property

cache: CacheProviderInterface | None

Lazy-resolve the cache provider for the circuit-close inflight lock.

Returns None if no provider can be resolved — caller falls open in that case. The inflight lock uses cache.get_lock() (owner-fenced DistributedLock), so adapter-level lock support is validated at acquire-time rather than via a separate setnx gate.

getattr with defaults handles test fixtures that bypass __init__ via ReplayService.__new__(...). A bypassed-init instance is observationally identical to a fresh instance with cache=None for this fail-open guard.

replay_single

replay_single(dlq_id: str) -> ReplayResult

Replay a single DLQ entry.

This method uses atomic acquisition to prevent race conditions when multiple workers try to replay the same entry simultaneously.

Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted

Audit Logging: - Blocks are automatically recorded in the AuditLog

Parameters:

Name Type Description Default
dlq_id str

ID of the FailedOperation to replay

required

Returns:

Type Description
ReplayResult

ReplayResult indicating success or failure

replay_batch

replay_batch(
    domain: str | None = None,
    failure_type: str | None = None,
    max_items: int = 100,
    use_adaptive: bool | None = None,
    use_priority: bool | None = None,
) -> BatchReplayResult

Replay multiple DLQ entries matching criteria.

Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted

Adaptive Mode: - When adaptive_enabled=True in RuntimeConfig, batch size is dynamic - High failure rate (>=20%) reduces batch size by 20% - 3 consecutive perfect batches increases batch size by 5

Priority Mode: - When priority_enabled=True in RuntimeConfig, domains are processed by priority - Critical domains are processed first, then normal, then low - Respects domain-specific max_retries overrides

Audit Logging: - Blocks are automatically recorded in the AuditLog

Parameters:

Name Type Description Default
domain str | None

Filter by domain (optional, ignored in priority mode)

None
failure_type str | None

Filter by failure type (optional)

None
max_items int

Maximum number of items to replay (ignored in adaptive mode)

100
use_adaptive bool | None

Override adaptive mode setting (None = use RuntimeConfig)

None
use_priority bool | None

Override priority mode setting (None = use RuntimeConfig)

None

Returns:

Type Description
BatchReplayResult

BatchReplayResult with summary and individual results

replay_on_circuit_close

replay_on_circuit_close(
    service_name: str,
    max_items: int = 50,
    escalate_failures: bool = True,
    service_failure_type_map: (
        dict[str, list[str]] | None
    ) = None,
) -> BatchReplayResult

Replay entries when circuit breaker closes.

This is triggered when an external service recovers. Only replays entries related to the recovered service.

IMPORTANT: When triggered by force_close with trigger_replay=True, any replay failures are escalated to REQUIRES_REVIEW status. This is because operator-initiated recovery implies the operator intended to resolve these items, so failures need explicit attention.

Parameters:

Name Type Description Default
service_name str

Name of the service that recovered

required
max_items int

Maximum number of items to replay

50
escalate_failures bool

If True, mark failed replays as REQUIRES_REVIEW

True
service_failure_type_map dict[str, list[str]] | None

Custom mapping of service names to failure types. If None, uses RuntimeConfig fallback. Example: {"my_service": ["TIMEOUT", "CONNECTION_ERROR"]}

None

Returns:

Type Description
BatchReplayResult

BatchReplayResult with summary. inflight_skipped=True indicates

BatchReplayResult

the per-service inflight lock rejected this call as a duplicate.

BatchReplayResult dataclass

BatchReplayResult(
    total: int = 0,
    success_count: int = 0,
    failed_count: int = 0,
    skipped_count: int = 0,
    results: list[ReplayResult] = list(),
    governance_blocked: bool = False,
    governance_block_reason: str = "",
    inflight_skipped: bool = False,
    priority_used: bool = False,
    domains_processed: list[str] | None = None,
)

Result of a batch replay operation.

ReplayResult dataclass

ReplayResult(
    success: bool,
    dlq_id: str,
    message: str = "",
    error: str | None = None,
    data: dict[str, Any] | None = None,
    skipped: bool = False,
)

Result of a replay operation.

succeeded classmethod

succeeded(
    dlq_id: str, message: str = "", data: dict | None = None
) -> ReplayResult

Factory for successful replay.

failed classmethod

failed(dlq_id: str, error: str) -> ReplayResult

Factory for failed replay.

skipped_result classmethod

skipped_result(
    dlq_id: str, reason: str = ""
) -> ReplayResult

Factory for idempotency-skipped replay.

blocked classmethod

blocked(
    dlq_id: str, governance_result: GovernanceCheckResult
) -> ReplayResult

Factory for governance-blocked replay.