Skip to content

baldur.interfaces — Eventing, Notification & Audit

The unified event-bus protocol (with its Kafka-shaped sub-protocols and NoOp defaults), the notification and alert adapter contracts, the audit-log adapter surface, and the traffic-routing adapter for multi-region failover.

Event bus

EventBusProtocol

Bases: Protocol

Protocol for event bus implementations.

Both BaldurEventBus (L1 in-memory) and RedisEventBus (L2 distributed) implement this protocol. Used as the return type of the unified get_event_bus() factory.

ConsumedEventProtocol

Bases: Protocol

Value-shape Protocol for events consumed from a Kafka topic.

Mirrors the field set on the Dormant ConsumedEvent class. Pure attribute Protocol — no methods. Used by OSS callers that pattern-match on event payload shape without importing the Dormant concrete class.

KafkaConsumerProtocol

Bases: Protocol

Protocol for Kafka audit consumers.

Implementations: the Dormant KafkaAuditConsumer class. Methods cover only the OSS-caller usage axis (start in background thread, stop cleanly).

KafkaEventBusProtocol

Bases: Protocol

Protocol for the Kafka-backed event bus (Producer + Consumer combo).

Implementations: the Dormant KafkaEventBus class. The OSS-facing surface is publish/subscribe + lifecycle methods (start/stop/close/flush).

KafkaProducerProtocol

Bases: Protocol

Protocol for Kafka audit producers (publishes events to Kafka).

Implementations: the Dormant KafkaAuditProducer class. OSS callers obtain instances via ProviderRegistry.kafka_eventbus.get("kafka_producer") or the equivalent factory.

NoOpKafkaEventBus

No-op fallback for the kafka_eventbus registry slot (OSS-safe).

Returned by ProviderRegistry.kafka_eventbus.get() when baldur_dormant is not installed. publish/subscribe silently no-op so OSS callers can use the registry result unconditionally; nothing is ever sent to a broker. Logs at DEBUG to surface accidental wiring on clean-OSS installs (typical Baldur pattern: NoOp logs are quiet).

Satisfies the "NoOp default registration requirement".

Notification

NotificationAdapter

Bases: ABC

ABC for notification adapters.

Implement this class to send notifications to your preferred channel. Duck-typed adapters are also accepted via register_notification_adapter(), which auto-registers them as virtual subclasses.

Example

class SlackNotificationAdapter(NotificationAdapter): def send(self, payload: NotificationPayload) -> bool: response = requests.post( SLACK_WEBHOOK_URL, json={"text": f"{payload.title}\n{payload.message}"} ) return response.ok

def send_batch(self, payloads: list[NotificationPayload]) -> int:
    return sum(1 for p in payloads if self.send(p))

@property
def channel(self) -> NotificationChannel:
    return NotificationChannel.SLACK

channel abstractmethod property

channel: NotificationChannel

Return the channel this adapter handles.

send abstractmethod

send(payload: NotificationPayload) -> bool

Send a single notification.

Returns:

Type Description
bool

True if sent successfully, False otherwise

send_batch abstractmethod

send_batch(payloads: list[NotificationPayload]) -> int

Send multiple notifications.

Returns:

Type Description
int

Number of successfully sent notifications

is_available

is_available() -> bool

Check if this adapter is available (configuration validity).

Non-abstract concrete method — ABC-inheriting adapters inherit the default return True. Network-based adapters should override with I/O-free self-diagnosis (e.g., URL format validation).

Duck-typed adapters (virtual subclasses) do NOT inherit this — callers must use getattr(adapter, 'is_available', lambda: True)().

NotificationChannel module-attribute

NotificationChannel = MessageChannel

NotificationSeverity module-attribute

NotificationSeverity = MessageSeverity

Alerting

AlertSeverity module-attribute

AlertSeverity = MessageSeverity

AlertCategory

Bases: str, Enum

Alert categories for routing.

Alert dataclass

Alert(
    title: str,
    description: str,
    severity: AlertSeverity = AlertSeverity.WARNING,
    category: AlertCategory = AlertCategory.AVAILABILITY,
    timestamp: datetime = (lambda: utc_now())(),
    source: str = "baldur",
    service_name: str | None = None,
    domain: str | None = None,
    slo_name: str | None = None,
    slo_target: float | None = None,
    slo_current: float | None = None,
    details: dict[str, Any] = dict(),
    runbook_url: str | None = None,
    alert_key: str | None = None,
)

Bases: SerializableMixin

Alert containing all relevant context.

Captures: - What happened (title, description) - How severe (severity) - What category (category) - Where (source, service_name) - Additional context (details)

key property

key: str

Generate alert key for deduplication.

to_json

to_json() -> str

Convert to JSON string.

AlertAdapter

Bases: ABC

Abstract interface for alerting.

Implementations can send alerts to: - stdout (StdoutAlertAdapter) - Files (FileAlertAdapter) - Slack/Teams (user implements) - PagerDuty/OpsGenie (user implements) - Email (user implements) - Nowhere (NullAlertAdapter)

send abstractmethod

send(alert: Alert) -> None

Send an alert.

Parameters:

Name Type Description Default
alert Alert

The alert to send

required

resolve abstractmethod

resolve(alert_key: str) -> None

Resolve/close an alert.

Parameters:

Name Type Description Default
alert_key str

The key of the alert to resolve

required

alert_cb_opened

alert_cb_opened(
    service_name: str,
    failure_count: int,
    threshold: int,
    is_manual: bool = False,
) -> None

Convenience method for Circuit Breaker open alert.

alert_cb_closed

alert_cb_closed(
    service_name: str, is_manual: bool = False
) -> None

Convenience method for Circuit Breaker close (resolves alert).

alert_dlq_threshold

alert_dlq_threshold(
    domain: str, pending_count: int, threshold: int
) -> None

Convenience method for DLQ threshold alert.

alert_slo_violation

alert_slo_violation(
    slo_name: str,
    target: float,
    current: float,
    service_name: str | None = None,
) -> None

Convenience method for SLO violation alert.

alert_high_error_rate

alert_high_error_rate(
    service_name: str, error_rate: float, threshold: float
) -> None

Convenience method for high error rate alert.

alert_failsafe_activated

alert_failsafe_activated(
    component: str,
    error_message: str,
    fallback_action: str = "PROCEED",
) -> None

CRITICAL: Fail-Safe mode activation alert.

Sent when part of the Baldur system fails and transitions into Fail-Safe mode. This alert requires immediate attention.

Parameters:

Name Type Description Default
component str

Component that failed (e.g., "error_budget", "circuit_breaker")

required
error_message str

Failure cause message

required
fallback_action str

Fallback action taken (e.g., "PROCEED", "ALLOW")

'PROCEED'
Note

This alert is designed to prevent "silent failures". When Fail-Safe activates, the system continues operating, but the operations team must be notified immediately to address the root cause.

resolve_failsafe

resolve_failsafe(component: str) -> None

Resolve the alert when Fail-Safe recovers.

alert_failsafe_recovered

alert_failsafe_recovered(
    component: str,
    downtime_seconds: float,
    recovery_reason: str = "System recovered automatically",
) -> None

Recovery notification: sent when normal operation is restored from Fail-Safe mode.

Similar to PagerDuty/OpsGenie "resolved" events, actively notifies that the failure has been cleared.

Parameters:

Name Type Description Default
component str

Component that recovered (e.g., "error_budget", "circuit_breaker")

required
downtime_seconds float

Failure duration (seconds)

required
recovery_reason str

Description of the recovery cause

'System recovered automatically'
Note

This alert prevents "silent recoveries". When a failure is cleared it is explicitly announced, so the operations team does not need to keep tracking the failure state.

alert_override_escalation

alert_override_escalation(
    override_type: str,
    requester: str,
    reason: str,
    service_name: str | None = None,
    escalation_channel: str = "#governance",
    escalation_mention: str = "@cto @security",
) -> None

Override escalation alert: sent when deployment is overridden under Error Budget exhaustion.

Netflix CAB (Change Advisory Board) style — when a deployment is forced through while the Error Budget is exhausted, escalates to a senior decision-maker / governance channel.

Parameters:

Name Type Description Default
override_type str

Override type (hotfix, security_patch, business_critical, etc.)

required
requester str

Override requester

required
reason str

Override reason

required
service_name str | None

Target service name

None
escalation_channel str

Escalation channel (e.g., #governance)

'#governance'
escalation_mention str

Mention target (e.g., @cto @security)

'@cto @security'
Note

This alert provides visibility into "risky actions". Every action that bypasses the Error Budget policy must be tracked.

Audit

AuditAction

Bases: str, Enum

Standard audit action types.

DLQ_FORCE_REDRIVE class-attribute instance-attribute

DLQ_FORCE_REDRIVE = 'dlq_force_redrive'

Operator cap-override re-drive of an at-cap (REQUIRES_REVIEW) entry — a privileged, audited action distinct from a normal replay.

API_ERROR class-attribute instance-attribute

API_ERROR = 'api_error'

Error raised while handling an API request.

VALIDATION_FAILED class-attribute instance-attribute

VALIDATION_FAILED = 'validation_failed'

Input validation failed.

AUTHORIZATION_DENIED class-attribute instance-attribute

AUTHORIZATION_DENIED = 'authorization_denied'

Authorization denied (no permission).

FORENSIC_CAPTURE_COMPLETED class-attribute instance-attribute

FORENSIC_CAPTURE_COMPLETED = 'forensic_capture_completed'

Forensic context recorded after a captured exception.

AuditEntry dataclass

AuditEntry(
    action: AuditAction | str,
    timestamp: datetime = (lambda: utc_now())(),
    actor_id: str | None = None,
    actor_type: str = "system",
    actor_roles: list[str] = list(),
    context_type: ContextType = ContextType.UNKNOWN,
    target_type: str | None = None,
    target_id: str | None = None,
    service_name: str | None = None,
    domain: str | None = None,
    reason: str | None = None,
    details: dict[str, Any] = dict(),
    success: bool = True,
    error_message: str | None = None,
)

Audit log entry containing all relevant context.

Captures: - What happened (action) - Who did it (actor_id, actor_type) - sourced automatically from ActorContext - What was affected (target_type, target_id) - Why (reason) - Additional context (details)

Note

When actor_id and actor_type are not set explicitly, they are pulled from ActorContext automatically. This auto-tracks "who set this and when".

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

to_json

to_json() -> str

Convert to JSON string.

from_dict classmethod

from_dict(data: dict[str, Any]) -> AuditEntry

Inverse of to_dict() — round-trip safe.

Unknown fields (e.g. integrity, checksum, audit_id added by ResilientContinuousAuditRecorder) are preserved in details under their original keys to maintain forensic completeness.

Forward-compatibility design (intentional, no schema_version field needed):

  1. New first-class field added later: old from_dict() running on new data — the unknown key falls into details, no exception. New from_dict() running on old data — data.get(key) returns None, default applies. Round-trip is preserved at every point in time.
  2. Field promoted from details to first-class: requires a one-time migration script. A schema_version field cannot solve this automatically.
  3. Field semantic change: cannot be auto-detected without a version field, but also cannot be auto-handled. Always requires coordinated reader/writer updates.

Conclusion: a schema_version meta field would add ceremony without solving the cases that actually need solving. The known-set + details-overflow design covers the only case that benefits from automation, and does so without ceremony.

Parameters:

Name Type Description Default
data dict[str, Any]

Dict produced by to_dict() (or a superset).

required

Returns:

Type Description
AuditEntry

AuditEntry reconstructed from the dict.

AuditLogAdapter

Bases: ABC

Abstract interface for audit logging.

Implementations can store audit logs in: - Files (FileAuditLogAdapter) - stdout (StdoutAuditLogAdapter) - Database (user implements) - External services like Loki, Datadog (user implements) - Nowhere (NullAuditLogAdapter)

log abstractmethod

log(entry: AuditEntry) -> None

Log an audit entry.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to log

required

query abstractmethod

query(
    action: AuditAction | str | None = None,
    target_type: str | None = None,
    target_id: str | None = None,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    limit: int = 100,
) -> list[AuditEntry]

Query audit logs (optional - may not be supported by all adapters).

Parameters:

Name Type Description Default
action AuditAction | str | None

Filter by action type

None
target_type str | None

Filter by target type

None
target_id str | None

Filter by target ID

None
start_time datetime | None

Filter from this time

None
end_time datetime | None

Filter until this time

None
limit int

Maximum entries to return

100

Returns:

Type Description
list[AuditEntry]

List of matching audit entries

log_cb_open

log_cb_open(
    service_name: str,
    reason: str,
    actor_id: str | None = None,
    is_manual: bool = True,
) -> None

Convenience method for Circuit Breaker open.

log_cb_close

log_cb_close(
    service_name: str,
    reason: str,
    actor_id: str | None = None,
    is_manual: bool = True,
    trigger_replay: bool = False,
) -> None

Convenience method for Circuit Breaker close.

log_dlq_store

log_dlq_store(
    dlq_id: int,
    domain: str,
    failure_type: str,
    error_message: str | None = None,
) -> None

Convenience method for DLQ storage.

log_dlq_replay

log_dlq_replay(
    dlq_id: int,
    domain: str,
    success: bool,
    actor_id: str | None = None,
    error_message: str | None = None,
) -> None

Convenience method for DLQ replay.

log_retry

log_retry(
    domain: str,
    func_name: str,
    attempt: int,
    max_attempts: int,
    success: bool,
    error_message: str | None = None,
) -> None

Convenience method for retry attempts.

log_governance_blocked

log_governance_blocked(
    block_reason: str,
    operation_name: str,
    details: dict[str, Any] | None = None,
    service_name: str | None = None,
    domain: str | None = None,
) -> None

Record an entry when automation was blocked by governance.

This method answers the "why didn't the operation run at this time?" question concretely.

Parameters:

Name Type Description Default
block_reason str

Block reason (kill_switch, emergency_mode, error_budget)

required
operation_name str

Name of the blocked operation

required
details dict[str, Any] | None

Additional details (emergency_level, budget_percent, etc.)

None
service_name str | None

Related service name

None
domain str | None

Domain (payment, point, etc.)

None

NoOpKafkaAuditAdapter

Bases: AuditLogAdapter

Kafka-shaped audit NoOp returned by audit_kafka_adapter slot.

When baldur_dormant is not installed, the ProviderRegistry.audit_kafka_adapter slot returns this adapter so OSS callers can request the Kafka audit path unconditionally without needing to check for the underlying broker. log() silently drops the entry (logging at DEBUG so operational tracing remains possible); query() returns an empty list — there is no backing topic.

NoOpWormAdapter

Bases: AuditLogAdapter

WORM-shaped audit NoOp returned by audit_worm_adapter slot.

The real WORM adapters (S3 Object Lock, Loki, sidecar) live in the Dormant package. When the Dormant package is absent, this NoOp answers the registry slot so callers don't need to guard against missing providers. log() drops entries silently; the compliance/non-repudiation property the real WORM adapters provide is not met — operators relying on it must install baldur-pro[dormant,aws] explicitly.

verify

verify() -> bool

Verify the WORM store integrity (NoOp: trivially True).

Real implementations check Object Lock retention, hash-chain continuity, etc. The NoOp returns True because there is nothing stored to violate.

Traffic routing

RoutingChange dataclass

RoutingChange(
    success: bool,
    from_region: str,
    to_region: str,
    details: dict[str, Any] = dict(),
    rollback_info: dict[str, Any] | None = None,
)

Traffic-routing change result.

Captures the result of a switch_primary() call and carries the information needed by rollback() to restore the previous state.

Attributes:

Name Type Description
success bool

Whether the switch succeeded

from_region str

Previous Primary region

to_region str

New Primary region

details dict[str, Any]

Switch details

rollback_info dict[str, Any] | None

Previous-state info for rollback

success instance-attribute

success: bool

Whether the switch succeeded.

from_region instance-attribute

from_region: str

Previous Primary region.

to_region instance-attribute

to_region: str

New Primary region.

details class-attribute instance-attribute

details: dict[str, Any] = field(default_factory=dict)

Switch details.

rollback_info class-attribute instance-attribute

rollback_info: dict[str, Any] | None = None

Previous-state info for rollback.

TrafficRoutingAdapter

Bases: ABC

Traffic routing adapter interface.

Abstract interface for shifting traffic at the DNS/LB layer during a regional outage. The baldur package does not include any external cloud SDK, so production adapters are implemented in the host app and registered with the ProviderRegistry.

Default implementation (LoggingTrafficRoutingAdapter): Operates at the application level only, without DNS/LB changes. Publishes a REGION_PRIMARY_CHANGED event via RedisEventBus so that ServiceLocalityRouter can refresh its routing table.

Example (AWS Route53): class Route53TrafficRouter(TrafficRoutingAdapter): def init(self, hosted_zone_id: str): self._client = boto3.client('route53') self._zone_id = hosted_zone_id

    def switch_primary(self, from_region, to_region) -> RoutingChange:
        self._client.change_resource_record_sets(
            HostedZoneId=self._zone_id,
            ChangeBatch={...}
        )
        return RoutingChange(
            success=True,
            from_region=from_region,
            to_region=to_region,
            details={"dns_updated": True},
        )

    def rollback(self, routing_change) -> bool:
        return self.switch_primary(
            routing_change.to_region,
            routing_change.from_region,
        ).success

    def get_current_routing(self) -> dict:
        return {"hosted_zone": self._zone_id}

Example (K8s Ingress): class K8sIngressTrafficRouter(TrafficRoutingAdapter): def switch_primary(self, from_region, to_region) -> RoutingChange: # kubectl patch ingress ... ...

Registration

ProviderRegistry.register_traffic_routing("route53", Route53TrafficRouter)

switch_primary abstractmethod

switch_primary(
    from_region: str, to_region: str
) -> RoutingChange

Switch the Primary region.

Shifts traffic to to_region at the DNS/LB layer.

Parameters:

Name Type Description Default
from_region str

Current Primary region

required
to_region str

New Primary region

required

Returns:

Type Description
RoutingChange

RoutingChange result (includes rollback info)

rollback abstractmethod

rollback(routing_change: RoutingChange) -> bool

Roll back a routing change.

Parameters:

Name Type Description Default
routing_change RoutingChange

The return value of switch_primary()

required

Returns:

Type Description
bool

True if the rollback succeeded

get_current_routing abstractmethod

get_current_routing() -> dict[str, Any]

Query the current routing state.