Skip to content

baldur.services — Security & Notification

Security-violation detection (service, config, violation/severity enums, the severity lookup table) and the OSS notification value types that surface incidents. The delivery service (SecurityNotificationService) ships in PRO — see its reference.

Violation detection

SecurityViolationService

SecurityViolationService(
    config: SecurityConfig | None = None,
    repository: SecurityIncidentRepository | None = None,
    cache: CacheProviderInterface | None = None,
)

Service for handling security violations.

Security violations are NEVER auto-recovered. They are: 1. Immediately blocked 2. Logged with full forensic context 3. Routed to security team for investigation

Usage

service = SecurityViolationService() result = service.handle_violation( violation_type=ViolationType.SIGNATURE_INVALID, request_info={"ip": "1.2.3.4", "user_agent": "..."}, description="Signature validation failed", )

For testing with mock repository

mock_repo = Mock(spec=SecurityIncidentRepository) service = SecurityViolationService(repository=mock_repo)

Initialize the security violation service.

Parameters:

Name Type Description Default
config SecurityConfig | None

Optional configuration, loads from settings if None

None
repository SecurityIncidentRepository | None

Optional repository for DI, uses default adapter if None

None
cache CacheProviderInterface | None

Optional cache provider for DI, uses default if None

None

repository property

repository: SecurityIncidentRepository

Get the repository, creating default adapter if needed.

cache property

cache: CacheProviderInterface

Get the cache provider, creating default if needed.

handle_violation

handle_violation(
    violation_type: str | ViolationType,
    request_info: dict[str, Any] | None = None,
    user_id: int | None = None,
    entity_refs: dict[str, int] | None = None,
    description: str = "",
    raw_request_data: dict[str, Any] | None = None,
) -> SecurityViolationResult

Handle a security violation.

This method: 1. Creates a SecurityIncident record via repository 2. Takes immediate protective action based on violation type 3. Triggers security team notification 4. Returns result with action taken

Parameters:

Name Type Description Default
violation_type str | ViolationType

Type of security violation

required
request_info dict[str, Any] | None

Request info dict with 'ip', 'user_agent' keys

None
user_id int | None

Associated user ID (if authenticated)

None
entity_refs dict[str, int] | None

Related entity references (e.g., {"order_id": 123})

None
description str

Detailed description of the violation

''
raw_request_data dict[str, Any] | None

Sanitized request data for forensics

None

Returns:

Type Description
SecurityViolationResult

SecurityViolationResult with incident ID and action taken

record_violation

record_violation(
    violation_type: str,
    details: dict[str, Any] | None = None,
    request_info: dict[str, Any] | None = None,
    user_id: int | None = None,
) -> SecurityViolationResult

Simplified interface for recording a violation.

This is a convenience method that wraps handle_violation for cases like CorruptionShield where simpler parameter passing is needed.

Parameters:

Name Type Description Default
violation_type str

Type of violation (e.g., "corruption_injection_attempt")

required
details dict[str, Any] | None

Violation details dict (becomes description + raw_request_data)

None
request_info dict[str, Any] | None

Optional request info with 'ip', 'user_agent'

None
user_id int | None

Optional associated user ID

None

Returns:

Type Description
SecurityViolationResult

SecurityViolationResult with incident ID and action taken

is_ip_banned

is_ip_banned(ip_address: str) -> bool

Check if an IP address is banned.

SecurityViolationResult dataclass

SecurityViolationResult(
    success: bool,
    incident_id: int | None = None,
    action_taken: str = "",
    error: str | None = None,
    protection_result: ProtectionResult | None = None,
)

Result of security violation handling.

handled classmethod

handled(
    incident_id: int,
    action: str,
    protection_result: ProtectionResult | None = None,
) -> SecurityViolationResult

Factory for successfully handled violation.

failed classmethod

failed(error: str) -> SecurityViolationResult

Factory for failed handling.

SecurityConfig dataclass

SecurityConfig(
    rate_limit_window_seconds: int = 60,
    rate_limit_max_requests: int = 100,
    temporary_ban_hours: int = 1,
    permanent_ban_threshold: int = 5,
    suspicious_ip_cache_timeout: int = 86400,
    injection_ban_hours: int = 24,
    failed_login_threshold: int = 5,
    suspicious_ip_cache_prefix: str = "security:suspicious_ip:",
    banned_ip_cache_prefix: str = "security:banned_ip:",
    session_engine: str = "django.contrib.sessions.backends.db",
    session_cookie_age: int = 1209600,
)

Configuration for security violation handling.

from_settings classmethod

from_settings() -> SecurityConfig

Load configuration from settings.

ViolationType

Bases: str, Enum

Types of security violations that never self-heal (domain-neutral).

RECOVERY_LOOP_DETECTED class-attribute instance-attribute

RECOVERY_LOOP_DETECTED = 'recovery_loop_detected'

Recovery/adjustment infinite-loop detected - the most severe.

CONFLICTING_ADJUSTMENT class-attribute instance-attribute

CONFLICTING_ADJUSTMENT = 'conflicting_adjustment'

Conflicting autonomous adjustment detected (e.g., A→B→A repetition).

HEALING_TIMEOUT class-attribute instance-attribute

HEALING_TIMEOUT = 'healing_timeout'

Baldur operation timed out.

FLAPPING_DETECTED class-attribute instance-attribute

FLAPPING_DETECTED = 'flapping_detected'

Parameter flapping detected (repeated micro-adjustments).

ANOMALY_STATISTICAL class-attribute instance-attribute

ANOMALY_STATISTICAL = 'anomaly_statistical'

L3 statistical anomaly detected (Z-score based).

ANOMALY_BEHAVIORAL class-attribute instance-attribute

ANOMALY_BEHAVIORAL = 'anomaly_behavioral'

Behavioral anomaly detected (sequence-pattern deviation).

SCHEMA_VIOLATION class-attribute instance-attribute

SCHEMA_VIOLATION = 'schema_violation'

L1 schema violation (missing required field, type mismatch).

BUSINESS_RULE_VIOLATION class-attribute instance-attribute

BUSINESS_RULE_VIOLATION = 'business_rule_violation'

L2 business-rule violation.

AUDIT_TAMPERING class-attribute instance-attribute

AUDIT_TAMPERING = 'audit_tampering'

Audit-log tampering attempt detected.

HASH_CHAIN_BROKEN class-attribute instance-attribute

HASH_CHAIN_BROKEN = 'hash_chain_broken'

ContinuousAuditRecorder hash-chain integrity violation.

WAL_CORRUPTION class-attribute instance-attribute

WAL_CORRUPTION = 'wal_corruption'

WAL CRC32 checksum mismatch.

UNAUTHORIZED_OVERRIDE class-attribute instance-attribute

UNAUTHORIZED_OVERRIDE = 'unauthorized_override'

Unauthorized configuration-change attempt.

GOVERNANCE_BYPASS_ATTEMPT class-attribute instance-attribute

GOVERNANCE_BYPASS_ATTEMPT = 'governance_bypass_attempt'

Kill Switch/Emergency Mode bypass attempt.

PRIVILEGE_ESCALATION class-attribute instance-attribute

PRIVILEGE_ESCALATION = 'privilege_escalation'

Privilege-escalation attempt.

Severity

Bases: str, Enum

Severity levels for security incidents.

SEVERITY_BY_VIOLATION_TYPE module-attribute

SEVERITY_BY_VIOLATION_TYPE: dict[str, Severity] = {
    SIGNATURE_INVALID: CRITICAL,
    DATA_TAMPERED: CRITICAL,
    TOKEN_FORGED: CRITICAL,
    REPLAY_ATTACK: CRITICAL,
    UNAUTHORIZED_ACCESS: HIGH,
    INJECTION_ATTEMPT: HIGH,
    RATE_LIMIT_ABUSE: MEDIUM,
    SUSPICIOUS_ACTIVITY: MEDIUM,
    RECOVERY_LOOP_DETECTED: CRITICAL,
    CONFLICTING_ADJUSTMENT: HIGH,
    HEALING_TIMEOUT: MEDIUM,
    FLAPPING_DETECTED: HIGH,
    AUDIT_TAMPERING: CRITICAL,
    HASH_CHAIN_BROKEN: CRITICAL,
    WAL_CORRUPTION: CRITICAL,
    GOVERNANCE_BYPASS_ATTEMPT: CRITICAL,
    PRIVILEGE_ESCALATION: CRITICAL,
    ANOMALY_STATISTICAL: HIGH,
    ANOMALY_BEHAVIORAL: HIGH,
    UNAUTHORIZED_OVERRIDE: HIGH,
    BUSINESS_RULE_VIOLATION: HIGH,
    SCHEMA_VIOLATION: MEDIUM,
}

handle_security_violation

handle_security_violation(
    violation_type: str | ViolationType,
    request_info: dict[str, Any] | None = None,
    user_id: int | None = None,
    description: str = "",
    **kwargs: Any
) -> SecurityViolationResult

Convenience function to handle a security violation.

This is the main entry point for security violation handling.

Parameters:

Name Type Description Default
violation_type str | ViolationType

Type of security violation

required
request_info dict[str, Any] | None

Request info dict with 'ip', 'user_agent' keys

None
user_id int | None

Associated user ID

None
description str

Description of the violation

''
**kwargs Any

Additional arguments passed to handle_violation

{}

Returns:

Type Description
SecurityViolationResult

SecurityViolationResult

Notification

The notification value types are OSS; the delivery service that consumes them (SecurityNotificationService) ships in PRO — see the PRO notification service reference.

SecurityNotificationResult dataclass

SecurityNotificationResult(
    incident_id: int,
    results: list[ChannelDeliveryResult] = list(),
)

Aggregate result of all notification attempts.

all_success property

all_success: bool

Check if all notifications were successful.

any_success property

any_success: bool

Check if any notification was successful.

add_result

add_result(result: ChannelDeliveryResult) -> None

Add a notification result.

NotificationConfig dataclass

NotificationConfig(
    slack_webhook_url: str = "",
    slack_critical_channel: str = "#critical-alerts",
    slack_high_channel: str = "#ops-alerts",
    slack_medium_channel: str = "#dev-alerts",
    email_critical_recipients: list[str] = list(),
    email_high_recipients: list[str] = list(),
    sms_critical_recipients: list[str] = list(),
    pagerduty_service_key: str = "",
    pagerduty_enabled: bool = False,
    webhook_urls: list[str] = list(),
    webhook_headers: dict[str, str] = dict(),
    enabled: bool = True,
    dry_run: bool = False,
)

Configuration for security notifications.

from_settings classmethod

from_settings() -> NotificationConfig

Load configuration from ChannelTargetSettings and NotificationSettings.

NotificationChannel module-attribute

NotificationChannel = MessageChannel

ChannelDeliveryResult dataclass

ChannelDeliveryResult(
    channel: str,
    success: bool,
    message: str = "",
    error: str | None = None,
)

Result of a notification attempt.

Singleton accessors

One process-wide singleton accessor rounds out this surface. It is built by a generic singleton factory, so it carries no standalone signature page; call it with no arguments to obtain the shared instance:

The notification service accessor (get_security_notification_service()) ships in PRO — see the PRO reference.