Skip to content

baldur.adapters.memory — Drift Reconciliation & Shadow Logging

The layered (L1+L2) consistency machinery: drift reconciliation between the in-memory tier and its backing store, and the shadow logger that records L2 sync failures for later replay.

Drift reconciliation

DriftReconciler

DriftReconciler(
    min_jitter_seconds: float = 0.0,
    max_jitter_seconds: float = 5.0,
    on_reconciled: (
        Callable[[DriftReconciliationRecord], None] | None
    ) = None,
)

Resolves state drift after L2 recovery.

While L2 is unavailable only L1 is updated; once L2 recovers the two caches can disagree. This class resolves drift using a "Most Restrictive Wins" strategy.

Priority: OPEN (3) > HALF_OPEN (2) > CLOSED (1) - More restrictive state wins (safety first) - On equal state the most recent timestamp wins

Thundering herd mitigation: - On L2 recovery every pod could rush to write at the same time and overload L2. - Apply jitter so writes are spread out over time.

Parameters:

Name Type Description Default
min_jitter_seconds float

minimum jitter (seconds)

0.0
max_jitter_seconds float

maximum jitter (seconds)

5.0
on_reconciled Callable[[DriftReconciliationRecord], None] | None

callback fired after reconciliation (metrics, logging, etc.)

None

get_jitter

get_jitter() -> float

Generate a jitter value between 0 and max.

reconcile

reconcile(
    service_name: str,
    l1_state: str,
    l2_state: str,
    l1_updated_at: datetime | None = None,
    l2_updated_at: datetime | None = None,
) -> tuple[str, DriftReconciliationResult]

Drift resolution strategy: 1. More restrictive state wins (Most Restrictive Wins) 2. On equal level, the most recent timestamp wins

Parameters:

Name Type Description Default
service_name str

service name

required
l1_state str

L1 state (closed, half_open, open)

required
l2_state str

L2 state

required
l1_updated_at datetime | None

L1 last update time

None
l2_updated_at datetime | None

L2 last update time

None

Returns:

Type Description
tuple[str, DriftReconciliationResult]

(winning state, reconciliation result)

schedule_reconciliation_sync

schedule_reconciliation_sync(
    service_name: str, do_reconcile: Callable[[], None]
) -> float

Apply jitter, sleep synchronously, then run reconciliation.

Parameters:

Name Type Description Default
service_name str

service name

required
do_reconcile Callable[[], None]

function that performs the actual reconciliation

required

Returns:

Type Description
float

applied jitter (seconds)

schedule_reconciliation_async async

schedule_reconciliation_async(
    service_name: str, do_reconcile: Callable[[], None]
) -> float

Apply jitter, sleep asynchronously, then run reconciliation.

Parameters:

Name Type Description Default
service_name str

service name

required
do_reconcile Callable[[], None]

function that performs the actual reconciliation

required

Returns:

Type Description
float

applied jitter (seconds)

get_history

get_history() -> list[DriftReconciliationRecord]

Return the reconciliation history.

get_stats

get_stats() -> dict[str, Any]

Return reconciliation statistics.

clear_history

clear_history() -> None

Clear history (for tests).

DriftReconciliationResult

Bases: str, Enum

Drift reconciliation outcome.

DriftReconciliationRecord dataclass

DriftReconciliationRecord(
    service_name: str,
    l1_state: str,
    l2_state: str,
    l1_updated_at: datetime | None,
    l2_updated_at: datetime | None,
    winner: str,
    result: DriftReconciliationResult,
    reconciled_at: datetime = (lambda: utc_now())(),
    jitter_seconds: float = 0.0,
)

Drift reconciliation record.

Captures the resolution of an L1/L2 state mismatch after L2 recovery.

get_drift_reconciler

get_drift_reconciler() -> DriftReconciler

Get the singleton DriftReconciler instance.

Shadow logging

ShadowLogger

Records state changes locally during an L2 outage.

The Shadow Log captures every state change that occurs while L2 is in a failed state into memory, supporting resynchronization after L2 recovery and forensic analysis.

Thread-safe implementation safe under concurrent access.

reset_instance classmethod

reset_instance() -> None

Reset singleton instance for test isolation.

set_max_entries

set_max_entries(max_entries: int) -> None

Set maximum entries to keep.

record_sync_failure

record_sync_failure(
    service_name: str,
    intended_state: str,
    error: Exception,
    adapter_type: str = "unknown",
    operation: str = "sync",
) -> None

Record an L2 sync failure.

Parameters:

Name Type Description Default
service_name str

Service name

required
intended_state str

The state we tried to sync

required
error Exception

Raised exception

required
adapter_type str

L2 adapter type (redis, django, etc.)

'unknown'
operation str

Operation kind (sync, update, delete)

'sync'

get_unsynced_records

get_unsynced_records() -> list[L2SyncFailureRecord]

List records that have not yet been synced.

get_all_records

get_all_records() -> list[L2SyncFailureRecord]

List all records.

mark_as_synced

mark_as_synced(service_name: str) -> int

Mark records as synced after recovery.

Parameters:

Name Type Description Default
service_name str

Service name

required

Returns:

Type Description
int

Number of records marked

mark_all_as_synced

mark_all_as_synced() -> int

Mark every unsynced record as synced.

get_stats

get_stats() -> dict[str, Any]

Read Shadow Log statistics.

clear

clear() -> None

Clear all records (for testing).

analyze_l2_failures

analyze_l2_failures() -> dict[str, Any]

Analyze state changes during the L2 outage window.

Integrates with the Forensic Advisor to analyze state changes that occurred during the L2 outage as a timeline.

Returns:

Type Description
dict[str, Any]

Analysis result dictionary

get_records_by_service

get_records_by_service(
    service_name: str,
) -> list[L2SyncFailureRecord]

List failure records for a given service.

get_records_by_time_range

get_records_by_time_range(
    start_time: datetime, end_time: datetime
) -> list[L2SyncFailureRecord]

List failure records within the given time range.

L2SyncFailureRecord dataclass

L2SyncFailureRecord(
    service_name: str,
    intended_state: str,
    failure_time: datetime,
    error_message: str,
    l1_state_at_failure: str,
    adapter_type: str = "unknown",
    operation: str = "sync",
    synced_after_recovery: bool = False,
    recovery_time: datetime | None = None,
)

L2 sync failure record.

Records state changes that occurred during an L2 outage to support forensic analysis and resynchronization after recovery.

get_shadow_logger

get_shadow_logger() -> ShadowLogger

Get the singleton ShadowLogger instance.