baldur — Types & Service Access
Core public data types and the entry points for resolving services: the
ProviderRegistry, the circuit-breaker service accessor, and the replay
service.
Core types
CircuitState
Bases: str, Enum
Circuit breaker states
FailedOperationData
dataclass
FailedOperationData(
id: str,
domain: str,
failure_type: str,
status: str,
entity_type: str | None = None,
entity_id: str | None = None,
entity_refs: dict[str, Any] = dict(),
user_id: int | None = None,
snapshot_data: dict[str, Any] = dict(),
error_code: str = "",
error_message: str = "",
retry_count: int = 0,
max_retries: int = 2,
last_retry_at: datetime | None = None,
request_data: dict[str, Any] = dict(),
response_data: dict[str, Any] = dict(),
metadata: dict[str, Any] = dict(),
resolved_at: datetime | None = None,
resolved_by_id: int | None = None,
resolution_type: str = "",
resolution_note: str = "",
next_action_hint: str = "",
recommended_action: str = "",
created_at: datetime | None = None,
updated_at: datetime | None = None,
expires_at: datetime | None = None,
)
Data transfer object for FailedOperation model.
Contains all necessary fields for DLQ operations without Django model dependencies.
is_pending
property
is_pending: bool
Check if operation is pending review
is_resolved
property
is_resolved: bool
Check if operation is resolved
can_retry
property
can_retry: bool
Check if operation can be retried
Service access
ProviderRegistry
Central registry for all pluggable components.
Each adapter/repository/strategy type is a GenericProviderRegistry class attribute. Thread-safe singleton creation is handled by GenericProviderRegistry's DCL pattern.
Sub-registries
cache, queue, async_queue — adapters audit, traffic_routing — adapters notification, alert — adapters rate_limit_storage — adapters failed_op_repo, circuit_breaker_repo, security_repo — repositories event_journal_repo, mesh_override_store — repositories correlation_strategy, root_cause_strategy — strategies graph_build_strategy — strategies anomaly_detection, forecast, classification — ML strategies optimization — ML strategies finops_service, learning_service — feature services compliance_engine, predictive_forecaster_service — feature services worker_background_starts — start callables shutdown_integrations, startup_integrations — lifecycle callables
Special (non-generic) registries: statistics — singleton adapter, not name-based
get_cache
classmethod
get_cache(
name: str | None = None, singleton: bool = True
) -> CacheProviderInterface
Get cache provider instance, wrapped with metrics decorator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Provider name (e.g., 'redis', 'memory') |
None
|
singleton
|
bool
|
If True, return cached instance |
True
|
Returns:
| Type | Description |
|---|---|
CacheProviderInterface
|
CacheProviderInterface instance (metrics-wrapped) |
get_audit_adapter
classmethod
get_audit_adapter(
name: str | None = None, singleton: bool = True
) -> AuditLogAdapter
Get audit adapter instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Adapter name (e.g., 'file', 'stdout', 'null') |
None
|
singleton
|
bool
|
If True, return cached instance |
True
|
Returns:
| Type | Description |
|---|---|
AuditLogAdapter
|
AuditLogAdapter instance |
get_event_journal_repo
classmethod
get_event_journal_repo(
name: str | None = None, singleton: bool = True
) -> EventJournalRepository
Get event journal repository instance.
When name is None the registry default is used. The default is
"memory" at module load and is rewired by init() to
"redis"/"sql"/"memory" via the event_journal_repo
PRIORITY_CHAIN row, honoring BALDUR_REDIS_URL and the
BALDUR_EVENT_JOURNAL_BACKEND operator override. Passing an
explicit name bypasses that wired default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Repository name (e.g., 'memory', 'redis', 'sql') |
None
|
singleton
|
bool
|
If True, return cached instance |
True
|
Returns:
| Type | Description |
|---|---|
EventJournalRepository
|
EventJournalRepository instance |
register_statistics_adapter
classmethod
register_statistics_adapter(
adapter: StatisticsRepositoryInterface,
) -> None
Register a statistics adapter.
Should be called during app initialization (e.g., Django's AppConfig.ready()). Only one statistics adapter can be registered at a time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
adapter
|
StatisticsRepositoryInterface
|
StatisticsRepositoryInterface implementation |
required |
get_statistics_repo
classmethod
get_statistics_repo() -> StatisticsRepositoryInterface
Get statistics repository instance.
Returns the registered statistics adapter, or NullStatisticsRepository if no adapter is registered.
Returns:
| Type | Description |
|---|---|
StatisticsRepositoryInterface
|
StatisticsRepositoryInterface instance |
has_statistics_adapter
classmethod
has_statistics_adapter() -> bool
Check if a statistics adapter is registered.
register_postmortem_repo
classmethod
register_postmortem_repo(
name: str, repo_class: type
) -> None
Register a postmortem repository.
get_postmortem_repo
classmethod
get_postmortem_repo(
name: str | None = None, singleton: bool = True
) -> PostmortemRepository
Get postmortem repository instance.
has_postmortem_repo
classmethod
has_postmortem_repo() -> bool
Check if any postmortem repository is registered.
register_cascade_event_repo
classmethod
register_cascade_event_repo(
name: str, repo_class: type
) -> None
Register a cascade event archive repository.
get_cascade_event_repo
classmethod
get_cascade_event_repo(
name: str | None = None, singleton: bool = True
) -> CascadeEventArchiveRepository
Get cascade event archive repository instance.
register_recovery_session_repo
classmethod
register_recovery_session_repo(
name: str, repo_class: type
) -> None
Register a recovery session archive repository.
get_recovery_session_repo
classmethod
get_recovery_session_repo(
name: str | None = None, singleton: bool = True
) -> RecoverySessionArchiveRepository
Get recovery session archive repository instance.
register_cache
classmethod
register_cache(name: str, provider_class: type) -> None
Register a cache provider adapter.
register_queue
classmethod
register_queue(name: str, provider_class: type) -> None
Register a task queue adapter.
register_async_queue
classmethod
register_async_queue(
name: str, adapter_class: type
) -> None
Register an async task queue adapter.
register_failed_operation_repo
classmethod
register_failed_operation_repo(
name: str, repo_class: type
) -> None
Register a failed operation repository.
register_circuit_breaker_repo
classmethod
register_circuit_breaker_repo(
name: str, repo_class: type
) -> None
Register a circuit breaker state repository.
register_security_repo
classmethod
register_security_repo(name: str, repo_class: type) -> None
Register a security incident repository.
register_event_journal_repo
classmethod
register_event_journal_repo(
name: str, repo_class: type
) -> None
Register an event journal repository.
register_audit_adapter
classmethod
register_audit_adapter(
name: str, adapter_class: type
) -> None
Register an audit log adapter.
register_traffic_routing
classmethod
register_traffic_routing(
name: str, adapter_class: type
) -> None
Register a traffic routing adapter.
register_notification
classmethod
register_notification(
name: str, adapter_class: type | Callable[..., Any]
) -> None
Register a notification adapter (class or zero-arg factory).
register_alert
classmethod
register_alert(
name: str, factory: type | Callable[..., Any]
) -> None
Register an alert adapter (class or zero-arg factory).
register_correlation_strategy
classmethod
register_correlation_strategy(
name: str, strategy_class: type
) -> None
Register a correlation strategy.
register_root_cause_strategy
classmethod
register_root_cause_strategy(
name: str, strategy_class: type
) -> None
Register a root cause strategy.
register_graph_build_strategy
classmethod
register_graph_build_strategy(
name: str, strategy_class: type
) -> None
Register a graph build strategy.
register_mesh_override_store
classmethod
register_mesh_override_store(
name: str, store_class: type
) -> None
Register a mesh override store implementation.
get_queue
classmethod
get_queue(
name: str | None = None, singleton: bool = True
) -> TaskQueueInterface
Get task queue instance.
get_async_queue
classmethod
get_async_queue(
name: str | None = None, singleton: bool = True
) -> AsyncTaskQueueInterface
Get async task queue instance.
get_failed_operation_repo
classmethod
get_failed_operation_repo(
name: str | None = None, singleton: bool = True
) -> FailedOperationRepository
Get failed operation repository instance.
get_circuit_breaker_repo
classmethod
get_circuit_breaker_repo(
name: str | None = None, singleton: bool = True
) -> CircuitBreakerStateRepository
Get circuit breaker state repository instance.
get_security_repo
classmethod
get_security_repo(
name: str | None = None, singleton: bool = True
) -> SecurityIncidentRepository
Get security incident repository instance.
get_traffic_routing
classmethod
get_traffic_routing(
name: str | None = None, singleton: bool = True
) -> TrafficRoutingAdapter
Get traffic routing adapter instance.
get_notification
classmethod
get_notification(
name: str | None = None,
) -> NotificationAdapter
Get notification adapter instance.
get_alert
classmethod
get_alert(name: str | None = None) -> AlertAdapter
Get alert adapter instance.
get_mesh_override_store
classmethod
get_mesh_override_store(
name: str | None = None, singleton: bool = True
) -> Any
Get a mesh override store instance.
Vestigial API-compat accessor: the store implementation moved to
the private distribution with the circuit_mesh feature, so the
slot is empty on OSS installs and this raises AdapterNotFoundError
unless a provider was registered via register_mesh_override_store.
get_correlation_strategy
classmethod
get_correlation_strategy(
name: str,
) -> type | Callable[..., Any]
Get registered correlation strategy class (not instance).
Strategy registries store class references that callers instantiate themselves, so we return the provider class directly rather than calling GenericProviderRegistry.get() which would instantiate it.
get_root_cause_strategy
classmethod
get_root_cause_strategy(
name: str,
) -> type | Callable[..., Any]
Get registered root cause strategy class (not instance).
get_graph_build_strategy
classmethod
get_graph_build_strategy(
name: str,
) -> type | Callable[..., Any]
Get registered graph build strategy class (not instance).
set_defaults
classmethod
set_defaults(
cache: str | None = None,
queue: str | None = None,
repo: str | None = None,
) -> None
Set default providers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache
|
str | None
|
Default cache provider name |
None
|
queue
|
str | None
|
Default task queue name |
None
|
repo
|
str | None
|
Default repository name (applied to all repo registries) |
None
|
get_defaults
classmethod
get_defaults() -> dict[str, str | None]
Get current default provider names.
list_providers
classmethod
list_providers() -> dict[str, Any]
List all registered providers.
clear_instances
classmethod
clear_instances() -> None
Clear all cached instances (providers remain registered).
reset
classmethod
reset() -> None
Reset registry to initial state (all providers and instances cleared).
health_check_all
classmethod
health_check_all() -> dict[str, bool]
Run health checks on all default providers.
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict mapping provider type to health status |
get_circuit_breaker_service
get_circuit_breaker_service() -> CircuitBreakerService
Return the runtime-scoped CircuitBreakerService singleton.
Delegates to the active :class:~baldur.runtime.BaldurRuntime —
test isolation, copy_context() scoping, and runtime
swap-in work transparently through the runtime's singleton store.
Explicit-def wrapper (instead of tuple-unpacking the
make_singleton_factory return value directly) so the symbol is
statically discoverable by docs tooling (mkdocstrings/griffe) and carries
a Public-surface docstring per the reference page contract.
Replay
ReplayService
ReplayService(
repository: FailedOperationRepository | None = None,
cache: CacheProviderInterface | None = None,
)
Bases: EventEmitterMixin
DLQ Replay Service.
Orchestrates replay operations for failed operations.
Usage
service = ReplayService()
Single replay
result = service.replay_single(dlq_id=123)
Batch replay
batch_result = service.replay_batch( failure_type="PG_TIMEOUT", max_items=50 )
For testing with mock repository
mock_repo = Mock(spec=FailedOperationRepository) service = ReplayService(repository=mock_repo)
Initialize the replay service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repository
|
FailedOperationRepository | None
|
Optional repository for DI, uses Django adapter if None |
None
|
cache
|
CacheProviderInterface | None
|
Optional cache provider for the per-service inflight lock
guarding |
None
|
repository
property
repository: FailedOperationRepository
Get the repository using ProviderRegistry with fallback policy.
cache
property
cache: CacheProviderInterface | None
Lazy-resolve the cache provider for the circuit-close inflight lock.
Returns None if no provider can be resolved — caller falls
open in that case. The inflight lock uses cache.get_lock()
(owner-fenced DistributedLock), so adapter-level lock support is
validated at acquire-time rather than via a separate setnx gate.
getattr with defaults handles test fixtures that bypass __init__
via ReplayService.__new__(...). A bypassed-init instance is
observationally identical to a fresh instance with cache=None for
this fail-open guard.
replay_single
replay_single(dlq_id: str) -> ReplayResult
Replay a single DLQ entry.
This method uses atomic acquisition to prevent race conditions when multiple workers try to replay the same entry simultaneously.
Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted
Audit Logging: - Blocks are automatically recorded in the AuditLog
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dlq_id
|
str
|
ID of the FailedOperation to replay |
required |
Returns:
| Type | Description |
|---|---|
ReplayResult
|
ReplayResult indicating success or failure |
replay_batch
replay_batch(
domain: str | None = None,
failure_type: str | None = None,
max_items: int = 100,
use_adaptive: bool | None = None,
use_priority: bool | None = None,
) -> BatchReplayResult
Replay multiple DLQ entries matching criteria.
Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted
Adaptive Mode: - When adaptive_enabled=True in RuntimeConfig, batch size is dynamic - High failure rate (>=20%) reduces batch size by 20% - 3 consecutive perfect batches increases batch size by 5
Priority Mode: - When priority_enabled=True in RuntimeConfig, domains are processed by priority - Critical domains are processed first, then normal, then low - Respects domain-specific max_retries overrides
Audit Logging: - Blocks are automatically recorded in the AuditLog
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str | None
|
Filter by domain (optional, ignored in priority mode) |
None
|
failure_type
|
str | None
|
Filter by failure type (optional) |
None
|
max_items
|
int
|
Maximum number of items to replay (ignored in adaptive mode) |
100
|
use_adaptive
|
bool | None
|
Override adaptive mode setting (None = use RuntimeConfig) |
None
|
use_priority
|
bool | None
|
Override priority mode setting (None = use RuntimeConfig) |
None
|
Returns:
| Type | Description |
|---|---|
BatchReplayResult
|
BatchReplayResult with summary and individual results |
replay_on_circuit_close
replay_on_circuit_close(
service_name: str,
max_items: int = 50,
escalate_failures: bool = True,
service_failure_type_map: (
dict[str, list[str]] | None
) = None,
) -> BatchReplayResult
Replay entries when circuit breaker closes.
This is triggered when an external service recovers. Only replays entries related to the recovered service.
IMPORTANT: When triggered by force_close with trigger_replay=True, any replay failures are escalated to REQUIRES_REVIEW status. This is because operator-initiated recovery implies the operator intended to resolve these items, so failures need explicit attention.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service that recovered |
required |
max_items
|
int
|
Maximum number of items to replay |
50
|
escalate_failures
|
bool
|
If True, mark failed replays as REQUIRES_REVIEW |
True
|
service_failure_type_map
|
dict[str, list[str]] | None
|
Custom mapping of service names to failure types. If None, uses RuntimeConfig fallback. Example: {"my_service": ["TIMEOUT", "CONNECTION_ERROR"]} |
None
|
Returns:
| Type | Description |
|---|---|
BatchReplayResult
|
BatchReplayResult with summary. |
BatchReplayResult
|
the per-service inflight lock rejected this call as a duplicate. |