Skip to content

baldur.adapters.memory — Core State Repositories

In-memory implementations of the highest-traffic repository interfaces: failed-operation persistence and circuit-breaker state. For testing, standalone (non-framework) usage, and prototyping.

InMemoryFailedOperationRepository

InMemoryFailedOperationRepository()

Bases: FailedOperationRepository

In-memory implementation of FailedOperationRepository.

Thread-safe storage for DLQ entries in memory. Data is lost when the process exits.

Maintains status/domain indexes for O(1) lookup instead of O(n) scan.

create

create(
    domain: str,
    failure_type: str,
    error_message: str = "",
    error_code: str = "",
    entity_type: str | None = None,
    entity_id: str | None = None,
    entity_refs: dict[str, Any] | None = None,
    user_id: int | None = None,
    snapshot_data: dict[str, Any] | None = None,
    request_data: dict[str, Any] | None = None,
    response_data: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    retry_count: int = 0,
    max_retries: int = 2,
    next_action_hint: str = "",
    recommended_action: str = "",
    expires_at: datetime | None = None,
) -> FailedOperationData

Create a new failed operation record (domain-neutral).

get_by_id

get_by_id(id: str) -> FailedOperationData | None

Get a failed operation by ID.

get_pending_by_domain

get_pending_by_domain(
    domain: str, limit: int = 100
) -> list[FailedOperationData]

Get pending operations for a specific domain.

get_pending_count_by_domain

get_pending_count_by_domain(domain: str) -> int

Get count of pending operations for a domain.

update_status

update_status(
    id: str,
    status: str,
    resolution_type: str = "",
    resolution_note: str = "",
    resolved_by_id: int | None = None,
    recommended_action: str = "",
) -> bool

Update the status of a failed operation.

increment_retry_count

increment_retry_count(id: str) -> bool

Increment retry count and update last_retry_at.

mark_as_resolved

mark_as_resolved(
    id: str,
    resolution_type: str,
    resolution_note: str = "",
    resolved_by_id: int | None = None,
) -> bool

Mark a failed operation as resolved.

get_expired_operations

get_expired_operations(
    before_date: datetime, limit: int = 100
) -> list[FailedOperationData]

Get operations that have expired.

bulk_update_status

bulk_update_status(ids: list[str], status: str) -> int

Bulk update status for multiple operations.

find_by_status

find_by_status(
    status: str,
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Find operations by status with optional filters.

find

find(
    *,
    status: str | None = None,
    domain: str | None = None,
    failure_type: str | None = None,
    offset: int = 0,
    limit: int = 100
) -> list[FailedOperationData]

Paginated cross-status query ordered by created_at DESC.

count

count(
    *,
    status: str | None = None,
    domain: str | None = None,
    failure_type: str | None = None
) -> int

Count operations matching filters (pre-slice set size).

count_created_in_window

count_created_in_window(
    start: datetime, end: datetime
) -> int

Count entries whose created_at is within the inclusive [start, end].

find_replayable

find_replayable(
    max_retries: int,
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Find operations that can be replayed.

find_sla_breached

find_sla_breached(
    current_time: datetime,
    sla_thresholds: dict[str, timedelta],
) -> list[FailedOperationData]

Find operations that have breached their SLA.

find_expired

find_expired(
    current_time: datetime,
) -> list[FailedOperationData]

Find operations past their retention period.

get_statistics

get_statistics() -> dict[str, Any]

Get statistics about failed operations.

Adds pending-specific breakdowns for the daily report: - pending_by_domain: {domain: pending_count} (required by update_dlq_pending_gauges — pre-existing bug fix) - pending_by_domain_and_failure_type: {domain: {failure_type: count}} (powers DLQPendingBreakdown in daily report)

Memory adapter iterates the pending index (O(N) in-memory).

get_facet_counts

get_facet_counts(
    *, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]

Faceted status×domain counts via the 1D/2D indexes.

by_status is scoped by domain; by_domain is scoped by status (faceted-search semantics). Empty buckets are dropped explicitly with if ids: _remove_from_index discards ids without deleting an emptied set, so a fully-drained status/domain key lingers as an empty set and would otherwise surface as :0, breaking zero-drop parity with the SQL/Redis adapters.

try_acquire_for_replay

try_acquire_for_replay(
    id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None

Atomically acquire a DLQ entry for replay.

force=True bypasses the cap gate (operator cap-override): it accepts a {PENDING, REQUIRES_REVIEW} source, resets retry_count to a fresh budget (this redrive is attempt 1), and stamps the metadata history scar before the reset. See FailedOperationRepository.try_acquire_for_replay.

complete_replay

complete_replay(
    id: str,
    success: bool,
    resolution_type: str = "",
    note: str = "",
    resolved_by_id: int | None = None,
    error_details: dict[str, Any] | None = None,
) -> bool

Complete a replay operation by updating the final status.

release_stale_replaying

release_stale_replaying(
    older_than_minutes: int = 30,
) -> int

Release DLQ entries stuck in REPLAYING state.

clear

clear() -> None

Clear all entries (for testing).

archive_old_resolved

archive_old_resolved(older_than_days: int = 30) -> int

Archive resolved entries older than N days.

purge_archived

purge_archived(
    ids: list[str] | None = None,
    older_than_days: int | None = None,
) -> int

Permanently delete archived entries.

count_archived_older_than

count_archived_older_than(older_than_days: int) -> int

Count archived entries older than N days.

count_all

count_all() -> int

Return active DLQ item count (excludes resolved/rejected/archived).

Matches Redis adapter semantics where resolved entries are removed from the PENDING_KEY sorted set.

count_by_domain

count_by_domain(domain: str) -> int

Return DLQ item count for a specific domain.

get_oldest_ids

get_oldest_ids(
    count: int, domain: str | None = None
) -> list[str]

Return IDs of the oldest items (by created_at).

delete

delete(entry_id: str) -> bool

Delete a single entry by ID. Returns True if deleted.

evict_oldest

evict_oldest(count: int, domain: str | None = None) -> int

Delete the oldest items, skipping entries in protected statuses.

get_cleanup_stats

get_cleanup_stats() -> dict[str, Any]

Get statistics for cleanup operations.

compress_and_evict_oldest

compress_and_evict_oldest(
    count: int, domain: str | None = None
) -> int

Compress then evict oldest entries (in-memory implementation).

Same logical flow as Redis adapter but uses Python dict/list.

store_compressed_entry

store_compressed_entry(entry: DLQCompressedEntry) -> bool

Store compressed entry in memory dict.

get_compressed_entries

get_compressed_entries(
    domain: str | None = None,
    status: str | None = None,
    limit: int = 100,
) -> list[DLQCompressedEntry]

Query compressed entries from memory, newest first.

get_compressed_summary

get_compressed_summary() -> dict[str, Any]

Aggregate statistics of compressed entries.

update_compressed_status

update_compressed_status(
    entry_id: str, new_status: str
) -> bool

Transition compressed entry lifecycle status.

InMemoryCircuitBreakerStateRepository

InMemoryCircuitBreakerStateRepository(
    sliding_window_size: int = 100,
)

Bases: CircuitBreakerStateRepository

In-memory implementation of CircuitBreakerStateRepository.

Thread-safe storage for circuit breaker states in memory.

When sliding_window_size is specified, record_failure() / record_success() perform Ring Buffer-based Sliding Window counting. failure_count and success_count reflect the count within the most recent N calls.

get_by_service_name

get_by_service_name(
    service_name: str,
) -> CircuitBreakerStateData | None

Get circuit breaker state by service name.

get_or_create

get_or_create(service_name: str) -> CircuitBreakerStateData

Get or create a circuit breaker state.

Double-checked locking — steady-state callers (entry exists) bypass the RLock entirely and pay only a GIL-atomic dict.get. Only the first-call create needs the lock. Mirrors the precedent in protect.py and the project-wide policy in factory/base.py ("read-only dict operations rely on CPython GIL atomicity").

Removes the surviving read-path acquire that was the contention floor under high concurrency.

update_state

update_state(
    service_name: str,
    state: str,
    failure_count: int | None = None,
    success_count: int | None = None,
    opened_at: datetime | None = None,
    last_failure_at: datetime | None = None,
    half_open_request_count: int | None = None,
    reset_half_open_count: bool = False,
) -> bool

Update circuit breaker state.

increment_failure_count

increment_failure_count(
    service_name: str,
    last_failure_at: datetime | None = None,
) -> int

Increment failure count.

reset_counts

reset_counts(service_name: str) -> bool

Reset failure/success counts and clear the OPEN-era timestamp.

opened_at is cleared alongside the counters so a CLOSED DTO does not carry a stale OPEN-era timestamp. The L2-authoritative close path in LayeredCircuitBreakerStateRepository invokes this before transitioning L1 to CLOSED.

set_manual_control

set_manual_control(
    service_name: str,
    state: str,
    controlled_by_id: int | None = None,
    reason: str = "",
    expires_at: datetime | None = None,
) -> bool

Set manual control override.

clear_manual_control

clear_manual_control(
    service_name: str, preserve_reason: bool = False
) -> bool

Clear manual control override.

Only the manual-control flag is cleared. state and the counters (failure_count, success_count) are not modified. If a state transition is needed, the caller must invoke update_state first.

record_failure

record_failure(
    service_name: str,
) -> CircuitBreakerStateData

Record a failure and return updated state.

Records a failure into the Sliding Window ring buffer, then updates state using the in-window failure/success counts.

record_success

record_success(
    service_name: str,
) -> CircuitBreakerStateData

Record a success and return updated state.

Records a success into the Sliding Window ring buffer, then updates state using the in-window failure/success counts.

record_success_with_close_check

record_success_with_close_check(
    service_name: str, success_threshold: int
) -> CircuitBreakerCloseAttempt

Atomic record-success + threshold-check + close transition.

Whole sequence executes under self._lock: eviction-aware window increment, threshold check, and (if crossed from HALF_OPEN) the close transition with _clear_window are one critical section. Closes the TOCTOU race where multiple stale-view callers each pass the threshold check and emit duplicate CIRCUIT_BREAKER_CLOSED events for the same logical recovery.

get_all_states

get_all_states() -> list[CircuitBreakerStateData]

Get all circuit breaker states.

reset

reset(service_name: str) -> bool

Reset circuit breaker to initial closed state.

atomic_force_open

atomic_force_open(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
    ttl_minutes: int = 90,
) -> tuple[bool, str, str]

Atomically force open a circuit breaker.

atomic_force_close

atomic_force_close(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
) -> tuple[bool, str, str]

Atomically force close a circuit breaker.

atomic_reset

atomic_reset(
    service_name: str,
    reason: str = "",
    controlled_by_id: int | None = None,
) -> tuple[bool, str, str]

Atomically reset a circuit breaker to initial state.

try_acquire_half_open_slot

try_acquire_half_open_slot(
    service_name: str,
    limit: int,
    stuck_timeout_seconds: int,
) -> tuple[bool, str, str]

Atomic HALF_OPEN slot acquisition under RLock.

reset_half_open_count

reset_half_open_count(service_name: str) -> None

Reset HALF_OPEN counter and clear window watermark.

get_open_states

get_open_states(
    limit: int | None = None,
) -> list[CircuitBreakerStateData]

Get OPEN circuit breaker states, oldest-first.

get_all_open

get_all_open() -> list[CircuitBreakerStateData]

Get all open circuit breakers.

delete

delete(service_name: str) -> bool

Delete a circuit breaker state.

delete_state

delete_state(service_name: str) -> bool

Delete circuit breaker state (alias for delete).

clear

clear() -> None

Clear all entries (for testing).