Skip to content

baldur.interfaces — Policy Composition & ML Strategy

The resilience-policy composition protocols (guards, hooks, failure sinks, the policy result/context DTOs) and the AI/ML strategy interfaces that back anomaly detection, forecasting, and classification.

Resilience policy

PolicyOutcome

Bases: str, Enum

Kinds of Policy execution result.

PolicyResult dataclass

PolicyResult(
    value: T | None = None,
    outcome: PolicyOutcome = PolicyOutcome.SUCCESS,
    error: Exception | None = None,
    executed_policies: list[str] = list(),
    total_attempts: int = 1,
    total_duration_ms: float = 0.0,
    metadata: dict[str, Any] = dict(),
)

Bases: Generic[T]

Unified result type for every resilience Policy.

Consolidates each pattern's existing result type into a single shape: - RetryResult(success, action, attempt, value, error, dlq_id) - FallbackResult(value, used_fallback, fallback_mode, original_error) - CircuitBreakerResult is for state management only and is not converted - BulkheadFullError exceptions are mapped to PolicyResult(outcome=REJECTED)

Attributes:

Name Type Description
value T | None

Result value (on success)

outcome PolicyOutcome

Kind of execution result

error Exception | None

Exception raised on failure

executed_policies list[str]

Names of policies that ran

total_attempts int

Total number of attempts

total_duration_ms float

Total execution time (milliseconds)

metadata dict[str, Any]

Per-pattern details (optional)

success property

success: bool

Whether the execution succeeded (including Fallback).

executed property

executed: bool

Whether the policy pipeline was executed (always True if PolicyComposer ran).

rejected property

rejected: bool

Whether the execution was rejected by a policy.

PolicyContext dataclass

PolicyContext(
    order_id: str | None = None,
    payment_id: str | None = None,
    user_id: str | None = None,
    tier_id: str | None = None,
    region: str | None = None,
    domain: str = "",
    trace_id: str | None = None,
    extra: dict[str, Any] = dict(),
)

Policy pipeline execution context (immutable).

frozen=True prevents side-effects inside the pipeline. Use dataclasses.replace() (or with_updates()) to produce a copy with edited fields (Copy-on-Write).

Attributes:

Name Type Description
order_id str | None

Order identifier (read by DLQ sink as entity_id).

payment_id str | None

Payment identifier (no current named-field consumer; decorator auto-extract still forwards it via extra["request_data"]).

user_id str | None

User identifier (read by DLQ sink as user_id column).

tier_id str | None

Service tier ("critical" | "standard" | "non_essential").

region str | None

Region identifier (input to ErrorBudgetGate).

domain str

Domain identifier (mirrors RetryConfig.domain).

trace_id str | None

Distributed-trace ID (OTel trace_id).

extra dict[str, Any]

Open-ended extension dict. Conventional keys: - request_data (dict): Per-call payload snapshot written by @protected / @dlq_protect auto-extract. The decorator binds the wrapped function's primitive-typed args and writes them here so DLQ entries carry the full payload for operator search (WHERE request_data->>'payment_id'='x'). Direct protect() callers may populate this manually. - snapshot_data (dict): Pre-failure snapshot for forensic replay; read by DLQ sink. - response_data (dict): Downstream response payload; read by DLQ sink. - user_id (str/int): Legacy fallback for direct callers who populate extra without setting PolicyContext.user_id; the named field wins when both are set.

with_updates

with_updates(**kwargs: Any) -> PolicyContext

Copy-on-Write: return a new instance with the given fields replaced.

GuardResult dataclass

GuardResult(
    allowed: bool,
    reason: str | None = None,
    metadata: dict[str, Any] = dict(),
)

Guard validation result.

Attributes:

Name Type Description
allowed bool

Whether execution is allowed (True = pass, False = reject)

reason str | None

Rejection reason (when allowed=False)

metadata dict[str, Any]

Additional information (per-Guard details)

ResiliencePolicy

Bases: Protocol[T]

Core Protocol implemented by synchronous resilience patterns.

Each Policy wraps a function and applies resilience logic. Composition of policies is handled by PolicyComposer.

Exception-handling contract: - Business exceptions are wrapped in PolicyResult(outcome=FAILURE, error=e) and returned - The "except Exception" pattern automatically lets KeyboardInterrupt/SystemExit through - BulkheadFullError is converted to PolicyResult(outcome=REJECTED) by the Policy wrapper

name property

name: str

Policy identifier (e.g., 'retry', 'circuit_breaker', 'bulkhead').

execute

execute(
    func: Callable[..., T],
    *args: Any,
    context: PolicyContext | None = None,
    **kwargs: Any
) -> PolicyResult[T]

Wrap a function in the Policy and execute it.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Function positional arguments

()
context PolicyContext | None

Execution context (propagated to Guard/Hook/Sink)

None
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
PolicyResult[T]

PolicyResult[T]: Unified result. Does not raise.

AsyncResiliencePolicy

Bases: Protocol[T]

Protocol implemented by asynchronous resilience patterns.

Current concrete implementation: AsyncSemaphoreBulkhead (async_semaphore.py). Follows the same exception-handling contract.

name property

name: str

Policy identifier.

execute async

execute(
    func: Callable[..., T],
    *args: Any,
    context: PolicyContext | None = None,
    **kwargs: Any
) -> PolicyResult[T]

Wrap an async function in the Policy and execute it.

Parameters:

Name Type Description Default
func Callable[..., T]

Async function to execute

required
*args Any

Function positional arguments

()
context PolicyContext | None

Execution context (propagated to Guard/Hook/Sink)

None
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
PolicyResult[T]

PolicyResult[T]: Unified result. Does not raise.

PolicyGuard

Bases: Protocol

Pre-execution validation for a Policy.

Guard implementations must define the default behavior when context=None: - KillSwitchGuard: ignore context, only check global state - ErrorBudgetGuard: tier_id=None -> global decision (tier-agnostic) - RetryBudgetGuard: decide against the default budget

name property

name: str

Guard identifier.

check

check(context: PolicyContext | None = None) -> GuardResult

Check whether execution is allowed.

Parameters:

Name Type Description Default
context PolicyContext | None

Execution context. When None, only the global state is checked.

None

Returns:

Name Type Description
GuardResult GuardResult

allowed=True passes, allowed=False rejects.

PolicyHook

Bases: Protocol

Hook that observes Policy execution events.

Fail-Open principle: hook failures must not stop business logic.

on_execute

on_execute(
    policy_name: str,
    attempt: int,
    context: PolicyContext | None = None,
) -> None

Called when execution starts.

on_success

on_success(
    policy_name: str,
    result: PolicyResult,
    context: PolicyContext | None = None,
) -> None

Called on successful execution.

on_failure

on_failure(
    policy_name: str,
    error: Exception,
    attempt: int,
    context: PolicyContext | None = None,
) -> None

Called on failed execution.

on_retry

on_retry(
    policy_name: str,
    attempt: int,
    delay: float,
    context: PolicyContext | None = None,
) -> None

Called before a scheduled retry (not invoked on the final failure or when the budget is exhausted).

on_reject

on_reject(
    guard_name: str,
    reason: str,
    context: PolicyContext | None = None,
) -> None

Called when a Guard rejects (Kill Switch, CB open, Bulkhead full, etc.).

FailureSink

Bases: Protocol

Interface that handles the terminal failure after every Policy is exhausted.

Performs final-failure handling such as DLQ persistence, error logging, alerts, etc.

handle_failure

handle_failure(
    error: Exception,
    context: PolicyContext | None,
    policy_result: PolicyResult,
) -> str | None

Handle the terminal failure.

Parameters:

Name Type Description Default
error Exception

Terminal failure exception

required
context PolicyContext | None

PolicyContext (order_id, user_id, etc. needed for DLQ persistence)

required
policy_result PolicyResult

Full pipeline result

required

Returns:

Type Description
str | None

Failure record ID (e.g., DLQ ID) or None

ML strategy

AnomalyDetectionStrategy

Bases: Protocol

Anomaly detection strategy - swappable between statistics / ML / deep learning.

Built-in
  • ZScoreDetector: Z-Score based
  • IQRDetector: IQR based
Consumer extension examples
  • IsolationForestDetector: scikit-learn Isolation Forest
  • AutoencoderDetector: PyTorch Autoencoder based
  • ProphetDetector: Facebook Prophet based seasonal-aware detection
Used by
  • PredictiveForecasterService (metric anomaly detection)
  • CoOccurrenceTracker (co-occurrence frequency anomaly detection)
  • CorruptionShield L3 (data anomaly detection)

detect

detect(
    value: float, context: dict[str, Any] | None = None
) -> tuple[bool, float]

Decide whether a single value is anomalous.

Parameters:

Name Type Description Default
value float

Value to check

required
context dict[str, Any] | None

Multi-dimensional features to pass to the ML model (optional). Ignored by the built-in statistical strategies (ZScore, IQR). ML implementations extract the features they need from this dict.

None

Returns:

Type Description
(is_anomalous, score)

anomaly verdict and anomaly score.

float

score does not need to be normalized -- strategies choose

tuple[bool, float]

their own scale (ZScore, probability, etc.).

update

update(
    value: float, context: dict[str, Any] | None = None
) -> None

Add a training sample (online learning).

Parameters:

Name Type Description Default
value float

New observation

required
context dict[str, Any] | None

Multi-dimensional features to pass to the ML model (optional)

None

reset

reset() -> None

Reset learning state.

get_feature_schema

get_feature_schema() -> dict[str, str] | None

Return the context key/type schema the strategy expects.

Returns:

Type Description
dict[str, str] | None

Shape like {"service_name": "str", "cpu_usage": "float", ...}.

dict[str, str] | None

None when no schema applies (statistical strategy).

dict[str, str] | None

Used by the orchestrator to validate inputs in advance.

ForecastStrategy

Bases: Protocol

Time-series forecasting strategy.

Built-in
  • HoltLinearForecaster: double exponential smoothing
Consumer extension examples
  • ProphetForecaster: Facebook Prophet
  • LSTMForecaster: PyTorch LSTM
  • ARIMAForecaster: statsmodels ARIMA
Used by
  • PredictiveForecasterService (metric forecasting)
  • CoOccurrenceTracker (co-occurrence frequency trend)

update

update(value: float) -> float

Update the model with a new observation.

Parameters:

Name Type Description Default
value float

New observation

required

Returns:

Type Description
float

Current level (smoothed value)

predict

predict(steps_ahead: int = 1) -> float | None

Predict a future value.

Parameters:

Name Type Description Default
steps_ahead int

Number of future steps to predict

1

Returns:

Type Description
float | None

Predicted value. None when there is insufficient data.

get_confidence

get_confidence() -> float

Confidence of the current model (0.0 to 1.0).

ClassificationStrategy

Bases: Protocol

Classification strategy - event / spike type classification.

Built-in
  • SpikeClassifier: rule-based classifier
Consumer extension examples
  • RandomForestClassifier: scikit-learn RF
  • XGBoostClassifier: XGBoost
  • NeuralClassifier: PyTorch NN
Used by
  • PredictiveForecasterService (spike classification)
  • CorrelationEngine (event pattern classification)

classify

classify(
    features: dict[str, float],
    context: dict[str, Any] | None = None,
) -> tuple[str, float]

Feature vector -> class label + confidence.

Parameters:

Name Type Description Default
features dict[str, float]

Feature name -> value mapping

required
context dict[str, Any] | None

Additional metadata to pass to the ML model (optional)

None

Returns:

Type Description
(label, confidence)

classification label and confidence (0.0 to 1.0)

BatchDetectable

Bases: Protocol

Batch anomaly detection marker.

ML implementations that support tensor batch operations implement this. Statistical strategies (ZScore, IQR) do not need to implement this.

Usage
  • CorrelationEngineService (batch dispatch)
  • MicroBatchConsumer (batch dispatch)

detect_batch

detect_batch(
    values: list[float],
    contexts: list[dict[str, Any] | None] | None = None,
) -> list[tuple[bool, float]]

Batch anomaly detection.

Parameters:

Name Type Description Default
values list[float]

Values to check.

required
contexts list[dict[str, Any] | None] | None

Per-value metadata (optional).

None

Returns:

Type Description
list[tuple[bool, float]]

List of (is_anomalous, score) tuples in input order.

update_batch

update_batch(values: list[float]) -> None

Add batch training data.

BatchClassifiable

Bases: Protocol

Batch classification marker.

ML classification implementations that support vectorized batch operations implement this. Rule-based classifiers (SpikeClassifier) do not need to implement this.

Usage
  • CorrelationEngine (batch event classification)

classify_batch

classify_batch(
    features_list: list[dict[str, float]],
    contexts: list[dict[str, Any]] | None = None,
) -> list[tuple[str, float]]

Batch classification.

Parameters:

Name Type Description Default
features_list list[dict[str, float]]

List of feature dicts.

required
contexts list[dict[str, Any]] | None

Per-item metadata (optional).

None

Returns:

Type Description
list[tuple[str, float]]

List of (label, confidence) tuples in input order.

OptimizationStrategy

Bases: Protocol

Parameter optimization strategy - settings value search.

Default
  • DecisionEngine rules (rule-based)
ML implementations
  • BayesianOptimizer: Gaussian Process + Expected Improvement
  • EvolutionaryOptimizer: CMA-ES (optional, future)
Usage
  • SettingsRecommendationService (optimal value search)
  • AutoTuningService (parameter suggestion)
Note

DecisionEngine (rule-based) and OptimizationStrategy (ML-based) operate on fundamentally different paradigms. Their integration is handled by SettingsRecommendationService (374), NOT by an adapter in this package. 373 provides the ML implementation; 374 orchestrates the pipeline.

suggest

suggest(
    parameter: str,
    current_value: float,
    bounds: tuple[float, float],
    history: list[dict[str, Any]],
    objective_metric: str,
    minimize: bool = True,
) -> tuple[float, float]

Suggest optimal value for a parameter.

Parameters:

Name Type Description Default
parameter str

Parameter name to optimize.

required
current_value float

Current parameter value.

required
bounds tuple[float, float]

(min, max) allowed range.

required
history list[dict[str, Any]]

Past observations [{parameter: value, metric: value}, ...].

required
objective_metric str

Metric name to optimize (e.g., "p99_latency_ms").

required
minimize bool

If True, lower metric = better (default). If False, higher metric = better (e.g., throughput RPS).

True

Returns:

Type Description
(suggested_value, expected_improvement)

Value and expected gain.

suggest_batch

suggest_batch(
    parameters: list[str],
    current_values: dict[str, float],
    bounds: dict[str, tuple[float, float]],
    history: list[dict[str, Any]],
    objective_metrics: list[str],
    minimize: bool = True,
) -> dict[str, tuple[float, float]]

Suggest optimal values for multiple parameters simultaneously.

Returns:

Type Description
dict[str, tuple[float, float]]

{parameter: (suggested_value, expected_improvement)}

update_observation

update_observation(
    parameters: dict[str, float], metrics: dict[str, float]
) -> None

Record an observation (parameter values -> metric outcomes).

Used to update the internal model after applying a recommendation.

StrategyLifecycle

Bases: Protocol

ML strategy lifecycle management - optional implementation.

Lightweight statistical strategies (ZScore, IQR) do not need to implement this. Heavy ML models (PyTorch, XGBoost, LLM) implement this.

Prior art
  • ShutdownHandler ABC: on_shutdown_start(), on_drain_complete()
  • ProviderRegistry.health_check_all(): unified provider health checks
  • HoltLinearForecaster: predict() -> None when warmup_samples < 30

initialize

initialize() -> None

Load model weights from disk to memory (or GPU VRAM).

Called during Orchestrator startup.

warmup

warmup() -> None

Run a first inference with dummy input.

Purpose: JIT compilation (PyTorch), CUDA kernel caching, TensorRT engine building, etc. Called after initialize() and before the first real detect().

is_ready

is_ready() -> bool

Whether the strategy is ready for inference.

Wired up to the K8s Readiness Probe. Must return an O(1) cached boolean (constrained by readinessProbe.timeoutSeconds=3).

teardown

teardown() -> None

Release resources (GPU VRAM, temporary files).

Called from GracefulShutdownCoordinator.initiate_shutdown().