baldur_pro.services.dlq — Dead-Letter Queue
Durable capture and replay of failed operations: the DLQService, the
store_to_dlq entry point, and the DLQ domain models.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
dlq
Dead Letter Queue (DLQ) Service Package.
Provides centralized DLQ operations for the baldur layer. Handles storage, retrieval, and management of failed operations.
Features: - Store failed operations with full forensic context - Query and filter DLQ entries (via Repository pattern) - Batch replay operations
Admin/Dashboard operations (cleanup, archive, purge, list, entry management)
should be implemented in the host application using Django ORM directly. This package follows domain-free principles.
Status: Public
DLQConfig
dataclass
DLQConfig(
enabled: bool = True,
retention_days: int = 30,
max_replay_attempts: int = 2,
retry_delay: int = 60,
expiry_hours: int = 72,
batch_size: int = 10,
)
DLQ runtime configuration.
Loaded from RuntimeConfigManager when available (PRO tier with hot reload), falling back to the static DLQSettings (Pydantic) defaults on OSS-only installs.
from_settings
classmethod
from_settings() -> DLQConfig
Load configuration from RuntimeConfigManager (preferred) or DLQSettings.
DLQEntryResult
dataclass
DLQEntryResult(
success: bool,
dlq_id: str | None = None,
error: str | None = None,
fallback_path: str | None = None,
)
Outcome of a single DLQ store/push operation.
fallback_path
class-attribute
instance-attribute
fallback_path: str | None = None
Local fallback path when the DB write failed but data was preserved.
DLQBatchReplayStats
dataclass
DLQBatchReplayStats(
processed: int = 0,
success: int = 0,
failed: int = 0,
skipped: int = 0,
errors: list[str] = list(),
)
Result of a batch replay operation (statistics).
Note: This is for batch DLQ replay statistics, not to be confused with ReplayResult in replay_service.py which is for single replay outcomes.
DLQThrottleBatchReplayResult
dataclass
DLQThrottleBatchReplayResult(
total: int = 0,
succeeded: int = 0,
failed: int = 0,
skipped: int = 0,
early_stop_reason: str | None = None,
)
Result of a batch throttle-aware DLQ replay operation.
DLQThrottleReplayResult
dataclass
DLQThrottleReplayResult(
success: bool,
entry_id: str | None = None,
error: str | None = None,
retry_after: float | None = None,
)
Result of a single throttle-aware DLQ replay operation.
DLQServiceBase
DLQServiceBase(
config: DLQConfig | None = None,
repository: FailedOperationRepository | None = None,
)
Base class for DLQ Service.
Provides initialization and common utilities.
Initialize the DLQ service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DLQConfig | None
|
Optional configuration, loads from settings if None |
None
|
repository
|
FailedOperationRepository | None
|
Optional repository for DI, uses Django adapter if None |
None
|
repository
property
repository: FailedOperationRepository
Get the repository using ProviderRegistry with fallback policy.
is_enabled
property
is_enabled: bool
Check if DLQ is enabled.
EntryOperationsMixin
Mixin providing DLQ entry operations using Repository pattern.
retry_entry
retry_entry(pk: str) -> dict[str, Any]
Retry a single DLQ entry by re-executing it through its replay handler.
Applies the same per-entry pipeline as the batch replay() path to
the single operator-selected entry: cap-gate, atomic acquire,
handler execution, then a cap-aware terminal transition. success
means the replay succeeded — not merely that the counter advanced.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pk
|
str
|
Entry primary key |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with operation details: - success: bool (True only if the replay handler succeeded) - id: str - retry_count: int (post-attempt count) - previous_retry_count: int - status: str (resulting entry status) - message: str |
Raises:
| Type | Description |
|---|---|
DLQEntryNotFoundError
|
If entry not found (-> HTTP 404) |
DLQStateConflictError
|
If entry is resolved/archived, has exhausted its replay attempts (at cap), or is not in a replayable state (-> HTTP 409) |
DLQReplayError
|
If the replay raised an unexpected exception (-> HTTP 500) |
force_redrive_entry
force_redrive_entry(
pk: str,
*,
actor_id: str | None = None,
reason: str = "",
ticket_url: str | None = None
) -> dict[str, Any]
Force-redrive an at-cap DLQ entry past the cap gate (operator override).
A deliberate, ADMIN-gated escape hatch: after diagnosing and fixing a
root cause, re-drive an entry that exhausted its cap because of that
now-fixed cause. Mirrors retry_entry() but acquires with
force=True (bypassing the cap gate) and grants a fresh retry budget.
The poison-pill convergence guarantee is preserved: a still-broken entry
re-converges to REQUIRES_REVIEW within max_replay_attempts further
automatic attempts.
The normal retry_entry() hard block on at-cap entries is untouched —
force is purely additive, and only this ADMIN-gated, audited path grants
the fresh budget.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pk
|
str
|
Entry primary key |
required |
actor_id
|
str | None
|
Acting operator (recorded in the force-redrive audit) |
None
|
reason
|
str
|
Operator justification (recorded in the audit) |
''
|
ticket_url
|
str | None
|
Optional change/incident ticket reference (recorded) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict mirroring |
Raises:
| Type | Description |
|---|---|
DLQEntryNotFoundError
|
If entry not found (-> HTTP 404) |
DLQStateConflictError
|
If entry is resolved/archived or not in a force-redrivable state (-> HTTP 409) |
DLQReplayError
|
If the replay raised an unexpected exception (-> HTTP 500) |
resolve_entry
resolve_entry(
pk: str,
notes: str = "",
resolution_type: str = "manual",
status: str = "resolved",
) -> dict[str, Any]
Resolve a DLQ entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pk
|
str
|
Entry primary key |
required |
notes
|
str
|
Resolution notes (optional) |
''
|
resolution_type
|
str
|
How the entry was resolved (default: "manual") |
'manual'
|
status
|
str
|
Target status (default: "resolved") |
'resolved'
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with operation details: - success: bool - id: str - previous_status: str - current_status: str - resolved_at: str (ISO format) - notes: str |
Raises:
| Type | Description |
|---|---|
DLQEntryNotFoundError
|
If entry not found (-> HTTP 404) |
DLQStateConflictError
|
If entry is already resolved/archived (-> HTTP 409) |
DLQError
|
If the resolve operation fails (-> HTTP 400) |
resolve_entries_batch
resolve_entries_batch(
pks: list[str],
resolution_type: str = "manual",
status: str = "resolved",
notes: str = "",
chunk_size: int | None = None,
) -> dict[str, Any]
Resolve multiple DLQ entries in chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pks
|
list[str]
|
Entry primary keys to resolve |
required |
resolution_type
|
str
|
How entries were resolved |
'manual'
|
status
|
str
|
Target status |
'resolved'
|
notes
|
str
|
Resolution notes |
''
|
chunk_size
|
int | None
|
Chunk size (default: from DLQSettings) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with batch operation summary |
get_entry
get_entry(pk: str) -> dict[str, Any] | None
Get detailed info for a single DLQ entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pk
|
str
|
Entry primary key |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary with entry details or None if not found |
ListOperationsMixin
Mixin providing DLQ list operations using Repository pattern.
list_entries
list_entries(
filters: dict[str, Any] | None = None,
page: int = 1,
page_size: int = 20,
) -> dict[str, Any]
Get paginated list of DLQ entries.
Delegates pagination to the repository-native find/count
primitive: storage-layer offset/limit instead of
load-everything-then-slice, and a no-status filter spans ALL statuses
(escalated/terminal entries are visible by default), not a hardcoded
subset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
dict[str, Any] | None
|
Dictionary with filter conditions - status: Filter by status - domain: Filter by domain - failure_type: Filter by failure type |
None
|
page
|
int
|
Page number (default 1, clamped to >= 1) |
1
|
page_size
|
int
|
Items per page (default 20, max 100) |
20
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with entries and pagination info: - results: List of entry dicts - page: Current page - page_size: Items per page - total_pages: Total number of pages - total_count: Total number of items - has_next: Whether there's a next page - has_previous: Whether there's a previous page |
MaintenanceOperationsMixin
DLQ maintenance: expire, archive, purge.
archive_old_entries
archive_old_entries(older_than_days: int = 30) -> int
Archive old RESOLVED entries. Called by CleanupService.
cleanup_old_entries
cleanup_old_entries(days_old: int = 30) -> dict
Compound: expire + archive. Called by cleanup_resolved_dlq_entries task.
purge_archived
purge_archived(
ids: list[str] | None = None,
older_than_days: int | None = None,
) -> int
Permanently delete old ARCHIVED entries. Called by CleanupService.
count_archived_older_than
count_archived_older_than(older_than_days: int) -> int
Count ARCHIVED entries older than N days. Called by CleanupService dry_run.
get_cleanup_stats
get_cleanup_stats() -> CleanupStats
Cleanup statistics as a CleanupStats value object.
Bridges the repository's dict-shaped cleanup stats to the canonical
CleanupStats model (its can_archive/can_purge derive from the
age-bucketed counts), matching the StatisticsProvider adapters that
already return CleanupStats. Read by the admin handler
dlq_cleanup_stats via attribute access.
expire_entries
expire_entries() -> int
Mark entries past expires_at as EXPIRED. Drains in batches.
QueryOperationsMixin
Mixin providing DLQ query operations.
get_pending_entries
get_pending_entries(
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Get pending DLQ entries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str | None
|
Filter by domain (optional) |
None
|
failure_type
|
str | None
|
Filter by failure type (optional) |
None
|
limit
|
int
|
Maximum number of entries to return |
100
|
Returns:
| Type | Description |
|---|---|
list[FailedOperationData]
|
List of pending FailedOperationData entries |
get_replayable_entries
get_replayable_entries(
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Get entries that can be replayed.
Entries are replayable if: - Status is PENDING - retry_count < max_retries
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str | None
|
Filter by domain (optional) |
None
|
failure_type
|
str | None
|
Filter by failure type (optional) |
None
|
limit
|
int
|
Maximum number of entries to return |
100
|
Returns:
| Type | Description |
|---|---|
list[FailedOperationData]
|
List of replayable FailedOperationData entries |
get_sla_breached_entries
get_sla_breached_entries() -> list[FailedOperationData]
Get entries that have breached their SLA.
SLA thresholds are loaded from configuration (domain-free). Uses SLAConfig.get_all_thresholds() to support any configured domain.
Returns:
| Type | Description |
|---|---|
list[FailedOperationData]
|
List of SLA-breached FailedOperationData entries |
get_expired_entries
get_expired_entries() -> list[FailedOperationData]
Get entries that have passed their retention period.
Returns:
| Type | Description |
|---|---|
list[FailedOperationData]
|
List of expired FailedOperationData entries |
get_entry_by_id
get_entry_by_id(dlq_id: str) -> FailedOperationData | None
Get a single DLQ entry by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dlq_id
|
str
|
The DLQ entry ID |
required |
Returns:
| Type | Description |
|---|---|
FailedOperationData | None
|
FailedOperationData or None |
get_stats
get_stats() -> dict[str, Any]
Get DLQ statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with DLQ statistics |
get_facet_counts
get_facet_counts(
*, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]
Get faceted status×domain DLQ counts for the admin-console filter.
Thin pass-through to the repository. The by_status map is scoped
by domain and by_domain is scoped by status (standard
faceted-search semantics); zero-count buckets are dropped. See
FailedOperationRepository.get_facet_counts for the full contract.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
str | None
|
Filter by status (scopes the |
None
|
domain
|
str | None
|
Filter by domain (scopes the |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, dict[str, int]]
|
Dict with |
ReplayOperationsMixin
Mixin providing DLQ replay operations.
replay
replay(
domain: str | None = None,
batch_size: int = 50,
request: Any = None,
) -> DLQBatchReplayStats
Execute batch replay of pending DLQ entries.
Hybrid logic: - request present -> buffer into RequestAuditBuffer (AuditMiddleware flushes in batch) - request absent -> call the adapter directly (async contexts such as Celery)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str | None
|
Filter by domain (optional) |
None
|
batch_size
|
int
|
Maximum number of entries to process (default 50) |
50
|
request
|
Any
|
Django HttpRequest object (buffered when present) |
None
|
Returns:
| Type | Description |
|---|---|
DLQBatchReplayStats
|
DLQBatchReplayStats with operation statistics |
replay_throttle_aware
replay_throttle_aware(
entry_id: str, throttle: AdaptiveThrottle
) -> DLQThrottleReplayResult
Throttle-aware safe single replay.
Validation order: 1. expires_at TTL expiry check (FailedOperationData.expires_at) 2. can_retry retry-limit check (retry_count < max_retries) 3. Throttle permit acquisition (store_rejection=False prevents DLQ re-storage) 4. _execute_replay() pipeline (can_replay -> replay)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_id
|
str
|
DLQ entry ID to reprocess |
required |
throttle
|
AdaptiveThrottle
|
AdaptiveThrottle instance |
required |
Returns:
| Type | Description |
|---|---|
DLQThrottleReplayResult
|
DLQThrottleReplayResult |
replay_all_throttle_aware
replay_all_throttle_aware(
throttle: AdaptiveThrottle,
domain: str | None = None,
batch_size: int = 10,
max_entries: int = 100,
) -> DLQThrottleBatchReplayResult
Throttle-aware batch replay.
get_replayable_entries() fetches only entries with retry_count < max_retries. Checks Throttle health per batch and stops on Emergency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
throttle
|
AdaptiveThrottle
|
AdaptiveThrottle instance |
required |
domain
|
str | None
|
Domain filter (optional) |
None
|
batch_size
|
int
|
Batch size for Throttle health checks |
10
|
max_entries
|
int
|
Maximum number of entries to process |
100
|
Returns:
| Type | Description |
|---|---|
DLQThrottleBatchReplayResult
|
DLQThrottleBatchReplayResult |
StoreOperationsMixin
Mixin providing DLQ store operations.
store_failure
store_failure(
domain: str,
failure_type: str,
entity_type: str | None = None,
entity_id: str | None = None,
user_id: int | None = None,
error_code: str = "",
error_message: str = "",
snapshot_data: dict[str, Any] | None = None,
request_data: dict[str, Any] | None = None,
response_data: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
next_action_hint: str = "",
recommended_action: str = "",
request: Any = None,
mode: Literal["sync", "async"] | None = None,
) -> DLQEntryResult
Store a failed operation in the DLQ.
Hybrid logic: - request present -> buffer into RequestAuditBuffer (AuditMiddleware flushes in batch) - request absent -> call the adapter directly (async contexts such as Celery)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str
|
Business domain (payment, point, inventory, webhook, notification) |
required |
failure_type
|
str
|
Specific failure type (e.g., PG_TIMEOUT, AMOUNT_MISMATCH) |
required |
entity_type
|
str | None
|
Type of related entity (e.g., "order", "payment", "product") |
None
|
entity_id
|
str | None
|
ID of related entity |
None
|
user_id
|
int | None
|
Related User ID |
None
|
error_code
|
str
|
Error code from external system |
''
|
error_message
|
str
|
Human-readable error message |
''
|
snapshot_data
|
dict[str, Any] | None
|
State snapshot for recovery |
None
|
request_data
|
dict[str, Any] | None
|
Original request payload |
None
|
response_data
|
dict[str, Any] | None
|
External system response |
None
|
metadata
|
dict[str, Any] | None
|
Additional debug context |
None
|
next_action_hint
|
str
|
Guidance for operators |
''
|
recommended_action
|
str
|
Suggested action (replay, manual_check, etc.) |
''
|
request
|
Any
|
Django HttpRequest object (buffered when present) |
None
|
mode
|
Literal['sync', 'async'] | None
|
Dispatch mode:
- |
None
|
Returns:
| Type | Description |
|---|---|
DLQEntryResult
|
DLQEntryResult with creation status |
create_test_entry
create_test_entry(
healing_domain: str | None = None,
failure_type: str | None = None,
user_id: int | None = None,
entity_type: str | None = "test",
entity_id: str | None = "",
error_message: str = "Test failure for load testing",
snapshot_data: dict[str, Any] | None = None,
request_data: dict[str, Any] | None = None,
response_data: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
created_by: str = "",
) -> dict[str, Any]
Create a synthetic DLQ entry for debugging / load testing.
Thin wrapper over store_failure(mode="sync") so the result is a
real, retrievable DLQ record on the same store / audit / metrics path
as a production failure. Returns a JSON-safe dict carrying the real
dlq_id — the admin POST /dlq/test/create handler returns it
verbatim with HTTP 201. Debug-only: callers outside DEBUG gate this at
the framework layer.
DLQService
DLQService(
config: DLQConfig | None = None,
repository: FailedOperationRepository | None = None,
)
Bases: StoreOperationsMixin, QueryOperationsMixin, ReplayOperationsMixin, EntryOperationsMixin, ListOperationsMixin, MaintenanceOperationsMixin, DLQServiceBase
Dead Letter Queue Service.
Provides centralized operations for managing failed operations.
Usage
service = DLQService() result = service.store_failure( domain="payment", failure_type="PG_TIMEOUT", order=order, error_message="Connection timed out", ) if result.success: print(f"Stored as DLQ entry {result.dlq_id}")
For testing with mock repository
mock_repo = Mock(spec=FailedOperationRepository) service = DLQService(repository=mock_repo)
get_dlq_service
get_dlq_service() -> DLQService
Get the singleton DLQ service instance.
reset_dlq_service
reset_dlq_service() -> None
Reset the singleton DLQ service instance.
store_to_dlq
store_to_dlq(
domain: str,
failure_type: str,
entity_type: str | None = None,
entity_id: str | None = None,
user_id: int | None = None,
error_code: str = "",
error_message: str = "",
snapshot_data: dict[str, Any] | None = None,
request_data: dict[str, Any] | None = None,
response_data: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
next_action_hint: str = "",
recommended_action: str = "",
request: Any = None,
mode: Literal["sync", "async"] | None = None,
) -> DLQEntryResult
Convenience function to store a failure in the DLQ.
This is a shortcut for get_dlq_service().store_failure(...). See
DLQService.store_failure for mode= semantics.