Skip to content

baldur.interfaces — Statistics & Event Journal

The statistics repository surface (summary DTOs + audit-trail records) and the event-journal interface that backs replay and forensics.

Statistics DTOs

StatusCounts dataclass

StatusCounts(
    total: int = 0,
    pending: int = 0,
    resolved: int = 0,
    failed: int = 0,
    archived: int = 0,
    reviewing: int = 0,
    replayed: int = 0,
    requires_review: int = 0,
    rejected: int = 0,
    expired: int = 0,
)

Status-wise count of DLQ entries.

DomainDistribution dataclass

DomainDistribution(
    domain: str, count: int, percentage: float
)

Domain-wise distribution of DLQ entries.

FailureTypeDistribution dataclass

FailureTypeDistribution(
    failure_type: str, count: int, percentage: float
)

Failure type distribution.

RecentActivity dataclass

RecentActivity(
    new_in_24h: int = 0,
    resolved_in_24h: int = 0,
    new_in_7d: int = 0,
    resolved_in_7d: int = 0,
    trend: str = "stable",
)

Recent activity statistics.

CleanupStats dataclass

CleanupStats(
    total: int = 0,
    by_status: dict[str, int] = dict(),
    resolved_older_than_30_days: int = 0,
    archived_older_than_90_days: int = 0,
)

Statistics for DLQ cleanup operations.

PaginatedResult dataclass

PaginatedResult(
    items: list[Any] = list(),
    total: int = 0,
    page: int = 1,
    page_size: int = 20,
    has_next: bool = False,
    has_prev: bool = False,
)

Paginated query result.

total_pages property

total_pages: int

Calculate total number of pages.

CircuitBreakerSummary dataclass

CircuitBreakerSummary(
    total: int = 0,
    closed: int = 0,
    open: int = 0,
    half_open: int = 0,
)

Summary of all circuit breakers.

CircuitBreakerInfo dataclass

CircuitBreakerInfo(
    service_name: str,
    state: str,
    failure_count: int = 0,
    success_count: int = 0,
    last_failure_time: datetime | None = None,
    last_state_change: datetime | None = None,
)

Information about a single circuit breaker.

Audit-trail DTOs

AuditTrailEntry dataclass

AuditTrailEntry(
    timestamp: datetime,
    action: str,
    actor_id: str | None = None,
    status: str | None = None,
    details: str | None = None,
    hash_chain: str | None = None,
    previous_hash: str | None = None,
)

Single audit trail entry for an entity.

EntityAuditTrail dataclass

EntityAuditTrail(
    entity_id: str,
    entity_type: str,
    domain: str,
    created_at: datetime | None = None,
    resolved_at: datetime | None = None,
    current_status: str = "unknown",
    entries: list[AuditTrailEntry] = list(),
)

Complete audit trail for a DLQ entity.

Provides end-to-end traceability from creation to resolution.

total_entries property

total_entries: int

Total number of audit entries.

is_chain_valid property

is_chain_valid: bool

Verify hash chain integrity.

Returns True if all entries have valid hash chain.

Interfaces

StatisticsRepositoryInterface

Bases: ABC

Statistics/Dashboard Repository Interface.

This interface defines operations for dashboards and analytics. Unlike runtime repositories (optimized for speed), this interface is designed for complex aggregate queries.

Implementations: - DjangoStatisticsAdapter: Uses Django ORM - SQLAlchemyStatisticsAdapter: Uses SQLAlchemy - NullStatisticsRepository: Returns empty results (default)

Usage

from baldur.factory import ProviderRegistry

stats_repo = ProviderRegistry.get_statistics_repo() counts = stats_repo.get_status_counts()

get_status_counts abstractmethod

get_status_counts() -> StatusCounts

Get count of DLQ entries by status.

Returns:

Type Description
StatusCounts

StatusCounts with counts for each status

get_domain_distribution abstractmethod

get_domain_distribution(
    limit: int = 10,
) -> list[DomainDistribution]

Get distribution of DLQ entries by domain.

Parameters:

Name Type Description Default
limit int

Maximum number of domains to return (top N)

10

Returns:

Type Description
list[DomainDistribution]

List of DomainDistribution sorted by count descending

get_failure_type_distribution abstractmethod

get_failure_type_distribution(
    limit: int = 10,
) -> list[FailureTypeDistribution]

Get distribution of DLQ entries by failure type.

Parameters:

Name Type Description Default
limit int

Maximum number of failure types to return (top N)

10

Returns:

Type Description
list[FailureTypeDistribution]

List of FailureTypeDistribution sorted by count descending

get_recent_activity abstractmethod

get_recent_activity(
    hours: int = 24, days: int = 7
) -> RecentActivity

Get recent activity statistics.

Parameters:

Name Type Description Default
hours int

Hours to look back for hourly stats

24
days int

Days to look back for daily stats

7

Returns:

Type Description
RecentActivity

RecentActivity with new/resolved counts and trend

get_resolution_rate abstractmethod

get_resolution_rate(days: int = 30) -> float

Calculate resolution success rate.

Parameters:

Name Type Description Default
days int

Number of days to look back

30

Returns:

Type Description
float

Resolution rate as a float (0.0 to 1.0)

get_avg_retry_count abstractmethod

get_avg_retry_count() -> float

Get average retry count across all DLQ entries.

Returns:

Type Description
float

Average retry count as a float

list_entries abstractmethod

list_entries(
    page: int = 1,
    page_size: int = 20,
    status: str | None = None,
    domain: str | None = None,
    failure_type: str | None = None,
    order_by: str = "-created_at",
) -> PaginatedResult

List DLQ entries with pagination and filtering.

Parameters:

Name Type Description Default
page int

Page number (1-indexed)

1
page_size int

Number of items per page

20
status str | None

Filter by status (optional)

None
domain str | None

Filter by domain (optional)

None
failure_type str | None

Filter by failure type (optional)

None
order_by str

Sort order (prefix with - for descending)

'-created_at'

Returns:

Type Description
PaginatedResult

PaginatedResult containing DLQ entries

get_entry_detail abstractmethod

get_entry_detail(entry_id: str) -> dict[str, Any] | None

Get detailed information about a specific DLQ entry.

Parameters:

Name Type Description Default
entry_id str

Unique identifier of the entry

required

Returns:

Type Description
dict[str, Any] | None

Dict with entry details or None if not found

get_sla_breaches abstractmethod

get_sla_breaches(
    sla_threshold_hours: int = 4,
    statuses: list[str] | None = None,
) -> dict[str, int]

Get count of SLA breaches by domain.

Finds DLQ entries that have exceeded the SLA threshold for resolution.

Parameters:

Name Type Description Default
sla_threshold_hours int

SLA threshold in hours (default: 4)

4
statuses list[str] | None

List of statuses to check (default: pending, reviewing, requires_review)

None

Returns:

Type Description
dict[str, int]

Dictionary mapping domain to breach count

get_cleanup_stats abstractmethod

get_cleanup_stats() -> CleanupStats

Get statistics for cleanup operations.

Returns:

Type Description
CleanupStats

CleanupStats with counts of entries eligible for cleanup

archive_old_entries abstractmethod

archive_old_entries(older_than_days: int = 30) -> int

Archive old resolved entries.

Parameters:

Name Type Description Default
older_than_days int

Archive entries resolved more than N days ago

30

Returns:

Type Description
int

Number of entries archived

purge_archived abstractmethod

purge_archived(
    ids: list[str] | None = None,
    older_than_days: int | None = None,
) -> int

Permanently delete archived entries.

Parameters:

Name Type Description Default
ids list[str] | None

Specific entry IDs to purge (optional)

None
older_than_days int | None

Purge archived entries older than N days (optional)

None

Returns:

Type Description
int

Number of entries purged

get_circuit_breaker_summary abstractmethod

get_circuit_breaker_summary() -> CircuitBreakerSummary

Get summary of all circuit breakers.

Returns:

Type Description
CircuitBreakerSummary

CircuitBreakerSummary with counts by state

list_circuit_breakers abstractmethod

list_circuit_breakers() -> list[CircuitBreakerInfo]

List all circuit breakers with their current state.

Returns:

Type Description
list[CircuitBreakerInfo]

List of CircuitBreakerInfo for all registered circuit breakers

persist_entry abstractmethod

persist_entry(entry_data: dict[str, Any]) -> str | None

Persist a DLQ entry to the statistics store.

Called by runtime layer to sync data to ORM for statistics. Can be called asynchronously.

Parameters:

Name Type Description Default
entry_data dict[str, Any]

Entry data from runtime repository

required

Returns:

Type Description
str | None

Entry ID if persisted, None otherwise

sync_from_runtime abstractmethod

sync_from_runtime(entries: list[dict[str, Any]]) -> int

Bulk sync entries from runtime repository.

Used for periodic synchronization.

Parameters:

Name Type Description Default
entries list[dict[str, Any]]

List of entry data from runtime repository

required

Returns:

Type Description
int

Number of entries synced

get_audit_trail_by_entity abstractmethod

get_audit_trail_by_entity(
    entity_id: str, entity_type: str = "dlq_entry"
) -> EntityAuditTrail

Get complete audit trail for a specific entity.

This method provides end-to-end traceability showing all actions from creation to resolution with hash chain verification.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity (e.g., DLQ ID)

required
entity_type str

Type of entity (default: "dlq_entry")

'dlq_entry'

Returns:

Type Description
EntityAuditTrail

EntityAuditTrail with all audit entries and chain verification

Example

trail = stats_repo.get_audit_trail_by_entity("dlq-123") print(f"Total actions: {trail.total_entries}") print(f"Chain valid: {trail.is_chain_valid}") for entry in trail.entries: print(f"{entry.timestamp}: {entry.action} by {entry.actor_id}")

link_audit_entry abstractmethod

link_audit_entry(
    entity_id: str,
    entity_type: str,
    action: str,
    actor_id: str | None = None,
    status: str | None = None,
    details: str | None = None,
    audit_record_hash: str | None = None,
) -> bool

Link an audit record to an entity.

Called when audit events are recorded to maintain the relationship between DLQ entries and their audit trail.

Parameters:

Name Type Description Default
entity_id str

Entity identifier

required
entity_type str

Entity type

required
action str

Action performed (store, replay, resolve, etc.)

required
actor_id str | None

Who performed the action

None
status str | None

New status after the action

None
details str | None

Additional details

None
audit_record_hash str | None

Hash from the audit system

None

Returns:

Type Description
bool

True if linked successfully

should_persist_async

should_persist_async() -> bool

Determine if persistence should be done asynchronously.

Override in implementations to enable async persistence. Default returns False for synchronous persistence.

Returns:

Type Description
bool

True if async persistence is preferred

get_async_persist_task_name

get_async_persist_task_name() -> str | None

Get the Celery task name for async persistence.

Returns:

Type Description
str | None

Task name string or None if sync persistence

EventJournalRepository

Bases: ABC

Baldur event journal storage interface.

Append-only storage. Recorded entries cannot be modified or deleted. Sequence numbers increase monotonically to guarantee order. Gaps may exist (e.g., 1, 2, 4, 5); consumers must not assume contiguity.

Implementations: - InMemoryEventJournalRepository: for tests and single process - RedisEventJournalRepository: for multi-worker environments

append abstractmethod

append(entry: JournalEntry) -> int

Append an event to the journal.

Parameters:

Name Type Description Default
entry JournalEntry

Journal entry (the sequence field is assigned by the implementation)

required

Returns:

Type Description
int

The assigned sequence number

query abstractmethod

query(
    query_filter: JournalQueryFilter,
) -> JournalQueryResult

Return entries matching the filter, sorted by sequence (ascending).

Parameters:

Name Type Description Default
query_filter JournalQueryFilter

Query criteria

required

Returns:

Type Description
JournalQueryResult

JournalQueryResult — entries (sequence ascending), truncated flag, total_count

get_sequence_range abstractmethod

get_sequence_range(
    start_sequence: int, end_sequence: int
) -> list[JournalEntry]

Look up entries by sequence range.

Used by simulations to replay a precise range.

Parameters:

Name Type Description Default
start_sequence int

Start sequence (inclusive)

required
end_sequence int

End sequence (exclusive)

required

Returns:

Type Description
list[JournalEntry]

List of entries sorted by sequence ascending

get_latest_sequence abstractmethod

get_latest_sequence() -> int

Return the current latest sequence number. 0 when empty.

count abstractmethod

count(query_filter: JournalQueryFilter) -> int

Return the number of entries matching the filter.

JournalEntry dataclass

JournalEntry(
    sequence: int,
    event_type: str,
    source: str,
    timestamp: datetime,
    service_name: str,
    context: dict[str, Any] = dict(),
    region: str = "",
    tier_id: str = "",
)

Event journal entry. Immutable (frozen) data.

JournalQueryFilter dataclass

JournalQueryFilter(
    event_types: list[str] | None = None,
    service_name: str | None = None,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    region: str | None = None,
    limit: int = 1000,
    context_filters: dict[str, str] | None = None,
)

Journal query filter.

JournalQueryResult dataclass

JournalQueryResult(
    entries: list[JournalEntry],
    truncated: bool,
    total_count: int | None = None,
)

Journal query result. Includes whether the result was truncated.