Skip to content

baldur.interfaces — Core State Repositories

The two highest-traffic repository interfaces: failed-operation persistence and circuit-breaker state. Adapter authors implement these to back Baldur on a new storage layer. The shared enums and DTOs live on the data-model page.

FailedOperationRepository

Bases: ABC

Abstract repository for FailedOperation (DLQ) data access.

Concrete implementations: - InMemoryFailedOperationRepository: in-process dict storage - SQLFailedOperationRepository: any DB-API 2.0 database - RedisDLQRepository: Redis via ResilientStorageBackend

create abstractmethod

create(
    domain: str,
    failure_type: str,
    error_message: str = "",
    error_code: str = "",
    entity_type: str | None = None,
    entity_id: str | None = None,
    entity_refs: dict[str, Any] | None = None,
    user_id: int | None = None,
    snapshot_data: dict[str, Any] | None = None,
    request_data: dict[str, Any] | None = None,
    response_data: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    retry_count: int = 0,
    max_retries: int = 2,
    next_action_hint: str = "",
    recommended_action: str = "",
    expires_at: datetime | None = None,
) -> FailedOperationData

Create a new failed operation record

get_by_id abstractmethod

get_by_id(id: str) -> FailedOperationData | None

Get a failed operation by ID

get_pending_by_domain abstractmethod

get_pending_by_domain(
    domain: str, limit: int = 100
) -> list[FailedOperationData]

Get pending operations for a specific domain

get_pending_count_by_domain abstractmethod

get_pending_count_by_domain(domain: str) -> int

Get count of pending operations for a domain

update_status abstractmethod

update_status(
    id: str,
    status: str,
    resolution_type: str = "",
    resolution_note: str = "",
    resolved_by_id: int | None = None,
    recommended_action: str = "",
) -> bool

Update the status of a failed operation.

recommended_action: Suggested operator action (e.g., "escalate", "manual_check", "replay"). Empty string preserves the existing value.

increment_retry_count abstractmethod

increment_retry_count(id: str) -> bool

Increment retry count and update last_retry_at

mark_as_resolved abstractmethod

mark_as_resolved(
    id: str,
    resolution_type: str,
    resolution_note: str = "",
    resolved_by_id: int | None = None,
) -> bool

Mark a failed operation as resolved

get_expired_operations abstractmethod

get_expired_operations(
    before_date: datetime, limit: int = 100
) -> list[FailedOperationData]

Get operations that have expired

bulk_update_status abstractmethod

bulk_update_status(ids: list[str], status: str) -> int

Bulk update status for multiple operations

find_by_status abstractmethod

find_by_status(
    status: str,
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Find operations by status with optional filters.

Distinct contract from find: positional status is required, results are ordered created_at ASC, and there is no offset. Used by replay/SLA paths that consume a single status oldest-first.

find abstractmethod

find(
    *,
    status: str | None = None,
    domain: str | None = None,
    failure_type: str | None = None,
    offset: int = 0,
    limit: int = 100
) -> list[FailedOperationData]

Paginated cross-status query with optional filters.

Results ordered by created_at DESC (newest-first). No filter returns all statuses — the default scope is "no filter = all", so escalated/terminal statuses (permanently_failed, requires_review) are visible by default. Mirrors the sibling archive repositories' find(*, ..., offset, limit) contract.

count abstractmethod

count(
    *,
    status: str | None = None,
    domain: str | None = None,
    failure_type: str | None = None
) -> int

Count operations matching filters (no filter = all statuses).

count_created_in_window abstractmethod

count_created_in_window(
    start: datetime, end: datetime
) -> int

Count operations whose created_at is in the inclusive [start, end].

Counts every status (no status scope): an entry created in the window that was later resolved/archived still consumed budget when it failed, so the windowed inflow count must not be status-scoped. Powers the Error Budget windowed inflow source.

find_replayable abstractmethod

find_replayable(
    max_retries: int,
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Find operations that can be replayed (pending and retry_count < max_retries)

find_sla_breached abstractmethod

find_sla_breached(
    current_time: datetime,
    sla_thresholds: dict[str, timedelta],
) -> list[FailedOperationData]

Find operations that have breached their SLA

find_expired abstractmethod

find_expired(
    current_time: datetime,
) -> list[FailedOperationData]

Find operations past their retention period

get_statistics abstractmethod

get_statistics() -> dict[str, Any]

Get statistics about failed operations

get_facet_counts abstractmethod

get_facet_counts(
    *, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]

Faceted status×domain counts for the admin-console DLQ filter.

Returns {"by_status": {status: n, ...}, "by_domain": {domain: n, ...}} with zero-count buckets dropped.

Standard faceted-search semantics: each facet excludes its own selection — by_status is scoped by the domain filter and by_domain is scoped by the status filter, so the dimension being chosen keeps all of its options. An unfiltered call returns the complete by_status + by_domain. Matching on both dimensions is exact (no substring/fuzzy).

try_acquire_for_replay abstractmethod

try_acquire_for_replay(
    id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None

Atomically acquire a DLQ entry for replay.

Normal mode (force=False) MUST: 1. Check if status is PENDING and retry_count < max_retries 2. If eligible, atomically set status to REPLAYING and increment retry_count 3. Return the FailedOperationData if acquired, None if not eligible

Force mode (force=True) is the operator-driven cap-override escape hatch for re-driving an at-cap entry after a root-cause fix. It MUST: 1. Accept status in {PENDING, REQUIRES_REVIEW} and reject every other status (RESOLVED / ARCHIVED / REPLAYING / REVIEWING / ... -> None) 2. Skip the retry_count < max_retries check entirely 3. Reset retry_count to a fresh budget (the redrive attempt is attempt 1 -> retry_count == 1 after acquire), so the entry becomes an ordinary under-cap entry for all downstream lifecycle logic 4. Before resetting, stamp the persisted metadata with previous_total_retries (the pre-reset count accumulated across prior force-redrives) and force_redrive_count (incremented), so the budget reset preserves the forensic scar 5. Atomically set status to REPLAYING

The poison-pill convergence guarantee is preserved: a still-broken force-redriven entry re-converges to REQUIRES_REVIEW within max_retries further automatic attempts via complete_replay.

Implementation should use row-level locking (SELECT FOR UPDATE) or optimistic locking (version/updated_at check) to prevent race conditions.

Parameters:

Name Type Description Default
id str

The DLQ entry ID to acquire

required
max_retries int

Maximum allowed retry attempts

required
force bool

Operator cap-override — bypass the cap gate and accept an at-cap REQUIRES_REVIEW entry (see Force mode above)

False

Returns:

Type Description
FailedOperationData | None

FailedOperationData if successfully acquired, None otherwise

Example Django implementation (normal mode): with transaction.atomic(): entry = FailedOperation.objects.select_for_update().get(id=id) if entry.status != 'pending' or entry.retry_count >= max_retries: return None entry.status = 'replaying' entry.retry_count += 1 entry.last_retry_at = now() entry.save() return FailedOperationData.from_model(entry)

complete_replay abstractmethod

complete_replay(
    id: str,
    success: bool,
    resolution_type: str = "",
    note: str = "",
    resolved_by_id: int | None = None,
    error_details: dict[str, Any] | None = None,
) -> bool

Complete a replay operation by updating the final status.

Should be called after replay execution to set final state: - success=True: Mark as RESOLVED with resolution details - success=False: Revert to PENDING (for retry) or REQUIRES_REVIEW (if escalated)

This method is safe to call without transaction wrapper as it only updates an already-acquired entry.

Parameters:

Name Type Description Default
id str

The DLQ entry ID

required
success bool

Whether the replay succeeded

required
resolution_type str

Type of resolution (for successful replays)

''
note str

Resolution note or error message

''
resolved_by_id int | None

User ID who resolved (None for system)

None
error_details dict[str, Any] | None

Additional error context (for failed replays)

None

Returns:

Type Description
bool

True if update succeeded, False otherwise

release_stale_replaying abstractmethod

release_stale_replaying(
    older_than_minutes: int = 30,
) -> int

Release DLQ entries stuck in REPLAYING state.

Entries can get stuck if the replay process crashes after acquiring but before completing. This method reverts them to PENDING for retry.

Parameters:

Name Type Description Default
older_than_minutes int

Consider entries older than this as stale

30

Returns:

Type Description
int

Number of entries released

archive_old_resolved abstractmethod

archive_old_resolved(older_than_days: int = 30) -> int

Archive resolved entries older than N days.

Changes status from RESOLVED to ARCHIVED. This is a soft-delete operation - data is preserved.

Parameters:

Name Type Description Default
older_than_days int

Archive entries resolved more than this many days ago

30

Returns:

Type Description
int

Number of entries archived

purge_archived abstractmethod

purge_archived(
    ids: list[str] | None = None,
    older_than_days: int | None = None,
) -> int

Permanently delete archived entries.

IMPORTANT: This is a destructive operation. Only archived entries can be purged. Either specify IDs or older_than_days, not both.

With neither argument the call is a no-op (returns 0): a destructive purge with no selection criteria deletes nothing (fail-safe default). To purge every archived entry, pass older_than_days=0 ("older than 0 days" matches all archived entries).

Parameters:

Name Type Description Default
ids list[str] | None

Specific entry IDs to purge (must be ARCHIVED status)

None
older_than_days int | None

Purge archived entries older than N days; 0 purges all archived entries

None

Returns:

Type Description
int

Number of entries permanently deleted

Raises:

Type Description
ValueError

If trying to purge non-archived entries

get_cleanup_stats abstractmethod

get_cleanup_stats() -> dict[str, Any]

Get statistics for cleanup operations.

Returns:

Name Type Description
dict[str, Any]

Dict with counts by status, age distributions, etc.

Example dict[str, Any]
dict[str, Any]

{ "total": 1500, "by_status": { "pending": 50, "resolved": 1200, "archived": 250, }, "resolved_older_than_30_days": 800, "archived_older_than_90_days": 100,

dict[str, Any]

}

count_archived_older_than abstractmethod

count_archived_older_than(older_than_days: int) -> int

Count archived entries older than N days.

Pushes counting to the repository layer where SQL adapters can use SELECT COUNT(*) instead of loading objects into memory.

Parameters:

Name Type Description Default
older_than_days int

Count archived entries resolved more than this many days ago

required

Returns:

Type Description
int

Number of matching archived entries

count_all abstractmethod

count_all() -> int

Return active DLQ item count (excludes resolved/rejected/archived).

count_by_domain abstractmethod

count_by_domain(domain: str) -> int

Return DLQ item count for a specific domain.

get_oldest_ids abstractmethod

get_oldest_ids(
    count: int, domain: str | None = None
) -> list[str]

Return IDs of the oldest items (by score/timestamp).

delete abstractmethod

delete(entry_id: str) -> bool

Delete a single DLQ entry by ID. Returns True if deleted.

evict_oldest abstractmethod

evict_oldest(count: int, domain: str | None = None) -> int

Delete the oldest items. Returns number of items actually deleted.

compress_and_evict_oldest

compress_and_evict_oldest(
    count: int, domain: str | None = None
) -> int

Summarize then evict oldest items. Default delegates to evict_oldest.

store_compressed_entry

store_compressed_entry(entry: DLQCompressedEntry) -> bool

Store a compressed summary entry. Returns True if stored.

get_compressed_entries

get_compressed_entries(
    domain: str | None = None,
    status: str | None = None,
    limit: int = 100,
) -> list[DLQCompressedEntry]

Return compressed DLQ entries, optionally filtered.

get_compressed_summary

get_compressed_summary() -> dict[str, Any]

Return aggregate statistics of compressed entries.

update_compressed_status

update_compressed_status(
    entry_id: str, new_status: str
) -> bool

Transition compressed entry status. Returns True if updated.

CircuitBreakerStateRepository

Bases: ABC

Abstract repository for CircuitBreakerState data access.

Manages circuit breaker state persistence and retrieval.

get_or_create abstractmethod

get_or_create(service_name: str) -> CircuitBreakerStateData

Get existing state or create new one for a service

get_by_service_name abstractmethod

get_by_service_name(
    service_name: str,
) -> CircuitBreakerStateData | None

Get circuit breaker state by service name

update_state abstractmethod

update_state(
    service_name: str,
    state: str,
    failure_count: int | None = None,
    success_count: int | None = None,
    opened_at: datetime | None = None,
    last_failure_at: datetime | None = None,
    half_open_request_count: int | None = None,
    reset_half_open_count: bool = False,
) -> bool

Update circuit breaker state.

Parameters:

Name Type Description Default
service_name str

Service identifier

required
state str

New state (closed, open, half_open)

required
failure_count int | None

Optional failure count

None
success_count int | None

Optional success count

None
opened_at datetime | None

Optional time when circuit was opened

None
half_open_request_count int | None

Optional half-open request counter value

None
reset_half_open_count bool

If True, atomically clear the half-open counter and watermark in the same write. Used by cold-path transitions out of HALF_OPEN (record_failure, record_success, force_open, force_close) so the state change and counter reset commit in a single round-trip.

False

Returns:

Type Description
bool

True on success

record_failure abstractmethod

record_failure(
    service_name: str,
) -> CircuitBreakerStateData

Record a failure and return updated state

record_success abstractmethod

record_success(
    service_name: str,
) -> CircuitBreakerStateData

Record a success and return updated state

record_success_with_close_check

record_success_with_close_check(
    service_name: str, success_threshold: int
) -> CircuitBreakerCloseAttempt

Record a success and atomically check whether to close the circuit.

Race-unsafe default implementation: invokes record_success followed by a separate update_state call when the threshold is met. Adapters that can perform the read-decide-write atomically (e.g., InMemory under a single lock acquire, Redis via Lua, SQL under a transaction) MUST override this to close the TOCTOU window that allows multiple callers to each observe a passing threshold and emit duplicate CIRCUIT_BREAKER_CLOSED events for the same logical transition.

The race-unsafe default exists so non-InMemory adapters compile and function without change — at the cost of retaining the duplicate-emit race they previously had. Their distributed-safe override is tracked as the "Distributed Redis-backed CB race" out-of-scope item.

Parameters:

Name Type Description Default
service_name str

Circuit breaker identifier.

required
success_threshold int

Number of HALF_OPEN successes required to transition to CLOSED. Comes from CircuitBreakerConfig.

required

Returns:

Type Description
CircuitBreakerCloseAttempt

CircuitBreakerCloseAttempt(state, did_close). did_close is

CircuitBreakerCloseAttempt

True only for the single caller that crossed the threshold under

CircuitBreakerCloseAttempt

the adapter's atomicity guarantee — concurrent stale-view callers

CircuitBreakerCloseAttempt

see did_close=False.

set_manual_control abstractmethod

set_manual_control(
    service_name: str,
    state: str,
    controlled_by_id: int | None = None,
    reason: str = "",
    expires_at: datetime | None = None,
) -> bool

Set manual control on a circuit breaker

clear_manual_control abstractmethod

clear_manual_control(
    service_name: str, preserve_reason: bool = False
) -> bool

Clear manual control from a circuit breaker

Parameters:

Name Type Description Default
service_name str

Name of the service

required
preserve_reason bool

If True, keep the existing control_reason value

False

get_all_states abstractmethod

get_all_states() -> list[CircuitBreakerStateData]

Get all circuit breaker states.

Scale bound: intended for OSS / PRO deployments where the total circuit-breaker count is well under ~1K (typically <= a few hundred). Used by admin dashboards and IPC snapshots that genuinely want a full picture. Callers needing larger result sets, or needing only a subset, should use get_open_states(limit=...) or filter via a future paginated API instead of growing this method.

get_open_states

get_open_states(
    limit: int | None = None,
) -> list[CircuitBreakerStateData]

Get circuit breaker states in OPEN state.

More efficient than get_all_states() + filter for large keyspaces. Default implementation filters get_all_states(); adapters may override with optimized queries (e.g., SCAN instead of KEYS in Redis).

Parameters:

Name Type Description Default
limit int | None

Maximum number of results. None means no limit.

None

Returns:

Type Description
list[CircuitBreakerStateData]

List of CircuitBreakerStateData with state == OPEN,

list[CircuitBreakerStateData]

ordered by opened_at ascending (oldest first).

reset abstractmethod

reset(service_name: str) -> bool

Reset circuit breaker to initial closed state

delete_state abstractmethod

delete_state(service_name: str) -> bool

Delete circuit breaker state entirely.

Used by reconciliation jobs to remove orphaned CB entries.

Parameters:

Name Type Description Default
service_name str

Service identifier (may be Composite Key)

required

Returns:

Type Description
bool

True if deleted, False if not found

atomic_force_open abstractmethod

atomic_force_open(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
    ttl_minutes: int = 90,
) -> tuple[bool, str, str]

Atomically force open a circuit breaker.

This method MUST use row-level locking to prevent concurrent modifications. Creates the circuit breaker if it doesn't exist.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
reason str

Reason for opening

''
controlled_by_id int | None

User ID who initiated the change

None
ttl_minutes int

TTL for manual override

90

Returns:

Type Description
tuple[bool, str, str]

Tuple of (success, previous_state, new_state)

Example Django implementation

with transaction.atomic(): state, created = CircuitBreakerState.objects.select_for_update().get_or_create( service_name=service_name ) previous = state.state state.state = 'open' state.manually_controlled = True state.save() return (True, previous, 'open')

atomic_force_close abstractmethod

atomic_force_close(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
) -> tuple[bool, str, str]

Atomically force close a circuit breaker.

This method MUST use row-level locking to prevent concurrent modifications.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
reason str

Reason for closing

''
controlled_by_id int | None

User ID who initiated the change

None

Returns:

Type Description
tuple[bool, str, str]

Tuple of (success, previous_state, new_state)

atomic_reset abstractmethod

atomic_reset(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
) -> tuple[bool, str, str]

Atomically reset a circuit breaker to initial state.

This method MUST use row-level locking to prevent concurrent modifications. Resets all counters and clears manual control.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
reason str

Reason for reset

''
controlled_by_id int | None

User ID who initiated the change

None

Returns:

Type Description
tuple[bool, str, str]

Tuple of (success, previous_state, new_state)

try_acquire_half_open_slot abstractmethod

try_acquire_half_open_slot(
    service_name: str,
    limit: int,
    stuck_timeout_seconds: int,
) -> tuple[bool, str, str]

Atomically acquire a HALF_OPEN trial slot.

This is the single race-free entry point for the should_allow HALF_OPEN branch. Implementations MUST perform the state-machine evaluation and the counter increment as one atomic operation (Redis Lua / SQL SELECT FOR UPDATE / in-memory under RLock).

State-machine branches (in evaluation order):

  1. state == "half_open" AND count >= limit AND now - half_open_window_started_at > stuck_timeout_seconds: Stuck-window auto-reset. Treat as a fresh OPEN→HALF_OPEN combo: reset half_open_request_count = 1, success_count = 0, refresh the window watermark. Return (True, "half_open", "half_open").
  2. state == "open" (recovery_timeout already verified by the caller): Atomic OPEN→HALF_OPEN transition: write state = "half_open", success_count = 0, half_open_request_count = 1, set the window watermark. Return (True, "open", "half_open").
  3. state == "half_open" AND count < limit: increment the counter. Return (True, "half_open", "half_open").
  4. state == "half_open" AND count >= limit (within window): Reject. Return (False, "half_open", "half_open").
  5. Otherwise (CLOSED, manual override, etc.): no-op. Return (False, current_state, current_state).

Parameters:

Name Type Description Default
service_name str

Service identifier

required
limit int

Maximum HALF_OPEN trial slots (cluster-wide)

required
stuck_timeout_seconds int

Window age (seconds) past which a maxed-out HALF_OPEN window is considered stalled (worker died mid-trial) and auto-reset on the next acquire

required

Returns:

Type Description
bool

Tuple of (allowed, previous_state, new_state). The service

str

emits CIRCUIT_BREAKER_HALF_OPENED iff

str

previous_state == "open" AND new_state == "half_open".

reset_half_open_count abstractmethod

reset_half_open_count(service_name: str) -> None

Reset the HALF_OPEN counter and clear the window watermark.

Used by manual_control flows where a counter reset is needed without a state change. Hot-path transitions out of HALF_OPEN should prefer update_state(..., reset_half_open_count=True) (single round-trip).

Parameters:

Name Type Description Default
service_name str

Service identifier

required