baldur.adapters.memory — Drift Reconciliation & Shadow Logging
The layered (L1+L2) consistency machinery: drift reconciliation between the in-memory tier and its backing store, and the shadow logger that records L2 sync failures for later replay.
Drift reconciliation
DriftReconciler
DriftReconciler(
min_jitter_seconds: float = 0.0,
max_jitter_seconds: float = 5.0,
on_reconciled: (
Callable[[DriftReconciliationRecord], None] | None
) = None,
)
Resolves state drift after L2 recovery.
While L2 is unavailable only L1 is updated; once L2 recovers the two caches can disagree. This class resolves drift using a "Most Restrictive Wins" strategy.
Priority: OPEN (3) > HALF_OPEN (2) > CLOSED (1) - More restrictive state wins (safety first) - On equal state the most recent timestamp wins
Thundering herd mitigation: - On L2 recovery every pod could rush to write at the same time and overload L2. - Apply jitter so writes are spread out over time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
min_jitter_seconds
|
float
|
minimum jitter (seconds) |
0.0
|
max_jitter_seconds
|
float
|
maximum jitter (seconds) |
5.0
|
on_reconciled
|
Callable[[DriftReconciliationRecord], None] | None
|
callback fired after reconciliation (metrics, logging, etc.) |
None
|
get_jitter
get_jitter() -> float
Generate a jitter value between 0 and max.
reconcile
reconcile(
service_name: str,
l1_state: str,
l2_state: str,
l1_updated_at: datetime | None = None,
l2_updated_at: datetime | None = None,
) -> tuple[str, DriftReconciliationResult]
Drift resolution strategy: 1. More restrictive state wins (Most Restrictive Wins) 2. On equal level, the most recent timestamp wins
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
l1_state
|
str
|
L1 state (closed, half_open, open) |
required |
l2_state
|
str
|
L2 state |
required |
l1_updated_at
|
datetime | None
|
L1 last update time |
None
|
l2_updated_at
|
datetime | None
|
L2 last update time |
None
|
Returns:
| Type | Description |
|---|---|
tuple[str, DriftReconciliationResult]
|
(winning state, reconciliation result) |
schedule_reconciliation_sync
schedule_reconciliation_sync(
service_name: str, do_reconcile: Callable[[], None]
) -> float
Apply jitter, sleep synchronously, then run reconciliation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
do_reconcile
|
Callable[[], None]
|
function that performs the actual reconciliation |
required |
Returns:
| Type | Description |
|---|---|
float
|
applied jitter (seconds) |
schedule_reconciliation_async
async
schedule_reconciliation_async(
service_name: str, do_reconcile: Callable[[], None]
) -> float
Apply jitter, sleep asynchronously, then run reconciliation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
do_reconcile
|
Callable[[], None]
|
function that performs the actual reconciliation |
required |
Returns:
| Type | Description |
|---|---|
float
|
applied jitter (seconds) |
get_history
get_history() -> list[DriftReconciliationRecord]
Return the reconciliation history.
get_stats
get_stats() -> dict[str, Any]
Return reconciliation statistics.
clear_history
clear_history() -> None
Clear history (for tests).
DriftReconciliationResult
Bases: str, Enum
Drift reconciliation outcome.
DriftReconciliationRecord
dataclass
DriftReconciliationRecord(
service_name: str,
l1_state: str,
l2_state: str,
l1_updated_at: datetime | None,
l2_updated_at: datetime | None,
winner: str,
result: DriftReconciliationResult,
reconciled_at: datetime = (lambda: utc_now())(),
jitter_seconds: float = 0.0,
)
Drift reconciliation record.
Captures the resolution of an L1/L2 state mismatch after L2 recovery.
get_drift_reconciler
get_drift_reconciler() -> DriftReconciler
Get the singleton DriftReconciler instance.
Shadow logging
ShadowLogger
Records state changes locally during an L2 outage.
The Shadow Log captures every state change that occurs while L2 is in a failed state into memory, supporting resynchronization after L2 recovery and forensic analysis.
Thread-safe implementation safe under concurrent access.
reset_instance
classmethod
reset_instance() -> None
Reset singleton instance for test isolation.
set_max_entries
set_max_entries(max_entries: int) -> None
Set maximum entries to keep.
record_sync_failure
record_sync_failure(
service_name: str,
intended_state: str,
error: Exception,
adapter_type: str = "unknown",
operation: str = "sync",
) -> None
Record an L2 sync failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name |
required |
intended_state
|
str
|
The state we tried to sync |
required |
error
|
Exception
|
Raised exception |
required |
adapter_type
|
str
|
L2 adapter type (redis, django, etc.) |
'unknown'
|
operation
|
str
|
Operation kind (sync, update, delete) |
'sync'
|
get_unsynced_records
get_unsynced_records() -> list[L2SyncFailureRecord]
List records that have not yet been synced.
get_all_records
get_all_records() -> list[L2SyncFailureRecord]
List all records.
mark_as_synced
mark_as_synced(service_name: str) -> int
Mark records as synced after recovery.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of records marked |
mark_all_as_synced
mark_all_as_synced() -> int
Mark every unsynced record as synced.
get_stats
get_stats() -> dict[str, Any]
Read Shadow Log statistics.
clear
clear() -> None
Clear all records (for testing).
analyze_l2_failures
analyze_l2_failures() -> dict[str, Any]
Analyze state changes during the L2 outage window.
Integrates with the Forensic Advisor to analyze state changes that occurred during the L2 outage as a timeline.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Analysis result dictionary |
get_records_by_service
get_records_by_service(
service_name: str,
) -> list[L2SyncFailureRecord]
List failure records for a given service.
get_records_by_time_range
get_records_by_time_range(
start_time: datetime, end_time: datetime
) -> list[L2SyncFailureRecord]
List failure records within the given time range.
L2SyncFailureRecord
dataclass
L2SyncFailureRecord(
service_name: str,
intended_state: str,
failure_time: datetime,
error_message: str,
l1_state_at_failure: str,
adapter_type: str = "unknown",
operation: str = "sync",
synced_after_recovery: bool = False,
recovery_time: datetime | None = None,
)
L2 sync failure record.
Records state changes that occurred during an L2 outage to support forensic analysis and resynchronization after recovery.
get_shadow_logger
get_shadow_logger() -> ShadowLogger
Get the singleton ShadowLogger instance.