baldur_pro.services.unified_notification — Unified Notification
Multi-channel notification dispatch: NotificationPayload,
UnifiedNotificationManager, and the notify convenience family.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
unified_notification
Unified Notification Manager
Central notification management for the Baldur system. Consolidates all notification logic into a single point of control, solving the problem of fragmented notification sources.
Architecture Problem Solved: - Before: 4 scattered notification sources (AlertAdapter, SecurityNotificationService, GateAlertManager, GovernanceService._send_notification) - After: Single UnifiedNotificationManager that routes all notifications
Key Features: - Centralized notification routing - Policy-based channel selection - Rate limiting and cooldown - Audit trail integration - Emergency level escalation
Reference
92_CONFIG_IMPLEMENTATION_GUIDE.md Week 3 [15] NotificationChannelSettings.
Status: Public
NotificationCategory
Bases: str, Enum
Notification categories for routing and filtering.
NotificationPayload
dataclass
NotificationPayload(
title: str,
message: str,
priority: NotificationPriority = NotificationPriority.MEDIUM,
category: NotificationCategory = NotificationCategory.OPERATIONS,
source: str = "unknown",
task_name: str | None = None,
task_id: str | None = None,
metadata: dict[str, Any] = dict(),
tags: list[str] = list(),
timestamp: datetime = (lambda: utc_now())(),
channels: list[str] | None = None,
dedup_key: str | None = None,
)
Bases: SerializableMixin
Unified notification payload.
All notification sources construct this payload so the downstream manager can route, dedup, and deliver uniformly.
source
class-attribute
instance-attribute
source: str = 'unknown'
Originating subsystem (e.g., drift_detection, circuit_breaker).
channels
class-attribute
instance-attribute
channels: list[str] | None = None
Routing hint; the manager may override.
dedup_key
class-attribute
instance-attribute
dedup_key: str | None = None
Optional key used for cooldown deduplication.
NotificationPriority
Bases: str, Enum
Notification priority levels.
CRITICAL
class-attribute
instance-attribute
CRITICAL = 'critical'
Immediate, all channels.
HIGH
class-attribute
instance-attribute
HIGH = 'high'
Urgent, Slack + Email.
MEDIUM
class-attribute
instance-attribute
MEDIUM = 'medium'
Normal, Slack only.
LOW
class-attribute
instance-attribute
LOW = 'low'
Low urgency, Slack only.
INFO
class-attribute
instance-attribute
INFO = 'info'
Log only unless configured.
NotificationResult
dataclass
NotificationResult(
success: bool,
channels_sent: list[str] = list(),
channels_failed: list[str] = list(),
suppressed: bool = False,
suppression_reason: str | None = None,
error: str | None = None,
)
Bases: SerializableMixin
Result of notification attempt.
ResolvedChannel
dataclass
ResolvedChannel(
type: str, target: str | list[str] | None = None
)
A fully resolved notification channel with concrete target.
Produced by ChannelResolver, consumed by SNS.deliver().
Attributes:
| Name | Type | Description |
|---|---|---|
type |
str
|
Channel type identifier ("slack", "email", "sms", "pagerduty") |
target |
str | list[str] | None
|
Concrete delivery target. str for Slack channel name, list[str] for email/SMS recipients, None for config-based (PagerDuty). |
ChannelResolver
ChannelResolver(
routing_settings=None,
notification_settings=None,
target_settings=None,
)
Full-resolution channel resolver.
Determines both channel types ("slack", "email") and concrete targets ("#security-incidents", ["[email protected]"]) from settings.
Single public API: resolve(priority, category, type_filter)
from_settings
classmethod
from_settings() -> ChannelResolver
Create ChannelResolver from current settings.
resolve
resolve(
priority: NotificationPriority,
category: NotificationCategory,
type_filter: list[str] | None = None,
) -> list[ResolvedChannel]
Resolve channels for a notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
priority
|
NotificationPriority
|
Notification priority level |
required |
category
|
NotificationCategory
|
Notification category |
required |
type_filter
|
list[str] | None
|
Optional list of channel types to restrict to (from payload.channels). Acts as a filter, not an override. |
None
|
Returns:
| Type | Description |
|---|---|
list[ResolvedChannel]
|
List of ResolvedChannel with concrete targets. |
get_cooldown
get_cooldown(category: NotificationCategory) -> int
Get cooldown seconds for a category.
UnifiedNotificationManager
UnifiedNotificationManager(
resolver: ChannelResolver | None = None,
cache: CacheProviderInterface | None = None,
)
Bases: EventEmitterMixin
Central manager for all baldur notifications.
Provides a single point of control for: - Channel routing based on priority/category via ChannelResolver - Rate limiting and cooldown - Emergency level escalation - Audit trail integration - Event emission for delivery failures (NOTIFICATION_DELIVERY_FAILED)
Usage
manager = get_unified_notification_manager()
result = manager.notify(NotificationPayload( title="SLA Drift Warning", message="Payment domain exceeded threshold", priority=NotificationPriority.HIGH, category=NotificationCategory.SLA, source="drift_detection", ))
cache
property
cache: CacheProviderInterface | None
Lazy-resolve the cache provider for the distributed cooldown lock.
Resolution is attempted once; on failure it logs a WARNING and returns None forever (local-only cooldown mode). A notification is a side-effect, so an unavailable cache fails open to the per-process in-memory verdict rather than blocking delivery.
getattr with defaults tolerates instances constructed via new (test fixtures) that bypass init.
notify
notify(payload: NotificationPayload) -> NotificationResult
Send a notification through the unified manager.
Handles routing, cooldown, escalation, and audit.
reset_cooldowns
reset_cooldowns() -> None
Reset all cooldowns and counters (for testing).
Owner-fenced: releases only the distributed windows THIS process created (the held-lock snapshot); remote windows expire via TTL by design. Release I/O runs after the mutex is dropped.
get_stats
get_stats() -> dict[str, Any]
Get notification statistics.
notify
notify(
title: str,
message: str,
priority: str = "medium",
category: str = "operations",
source: str = "unknown",
**kwargs: Any
) -> NotificationResult
Convenience function for sending notifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
title
|
str
|
Notification title |
required |
message
|
str
|
Notification message |
required |
priority
|
str
|
Priority level (critical, high, medium, low, info) |
'medium'
|
category
|
str
|
Category (security, operations, sla, etc.) |
'operations'
|
source
|
str
|
Source identifier |
'unknown'
|
**kwargs
|
Any
|
Additional payload fields |
{}
|
Returns:
| Type | Description |
|---|---|
NotificationResult
|
NotificationResult |
Usage
from baldur_pro.services.unified_notification import notify
notify( title="SLA Drift Warning", message="Payment domain exceeded 20% threshold", priority="high", category="sla", source="drift_detection", metadata={"domain": "payment", "rate": 25.0}, )
notify_error
notify_error(
title: str,
message: str,
error: Exception,
source: str = "unknown",
**kwargs
) -> NotificationResult
Convenience function for error notifications.
notify_incident
notify_incident(
incident_id: int,
incident_type: str,
severity: str = "high",
description: str = "",
source_ip: str | None = None,
user_id: int | None = None,
action_taken: str = "",
**kwargs: Any
) -> NotificationResult
Security incident notification with rich formatting.
Combines incident formatting (from security_notification/formatters.py) with UNM routing in one call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
incident_id
|
int
|
The security incident ID |
required |
incident_type
|
str
|
Type of incident |
required |
severity
|
str
|
Severity level |
'high'
|
description
|
str
|
Description of the incident |
''
|
source_ip
|
str | None
|
Source IP address |
None
|
user_id
|
int | None
|
Associated user ID |
None
|
action_taken
|
str
|
Action taken in response |
''
|
**kwargs
|
Any
|
Additional payload fields (source, metadata) |
{}
|
Returns:
| Type | Description |
|---|---|
NotificationResult
|
NotificationResult |
notify_security
notify_security(
title: str,
message: str,
priority: str = "high",
source: str = "security",
**kwargs
) -> NotificationResult
Convenience function for security notifications.
notify_sla
notify_sla(
title: str,
message: str,
domain: str,
priority: str = "medium",
source: str = "sla_monitor",
**kwargs
) -> NotificationResult
Convenience function for SLA-related notifications.
get_unified_notification_manager
get_unified_notification_manager() -> (
UnifiedNotificationManager
)
Get the singleton UnifiedNotificationManager instance.
reset_notification_manager
reset_notification_manager() -> None
Reset the notification manager singleton (for testing).