Skip to content

baldur.decorators — Resilience Decorators

Opinionated @protected presets and orthogonal gates. The primitive @protected / @aprotected lives at the top level; this module hosts preset compositions (@dlq_protect) and orthogonal call-site gates (@idempotent, @rate_limit, @domain_tag).

See also

Django quickstart — end-to-end decorator wiring on a real example app.

Decorators

dlq_protect

dlq_protect(
    name: str,
    *,
    fallback: (
        Callable[[], Any]
        | Callable[[], Awaitable[Any]]
        | None
    ) = None,
    timeout: float | None = _TIMEOUT_UNSET,
    context_from: (
        Callable[..., PolicyContext] | None | Literal[False]
    ) = _CONTEXT_FROM_UNSET
) -> Callable[[Callable[..., T]], Callable[..., T]]

PRO-aliased @protected with zero-message-loss defaults pinned.

Equivalent to @protected(name, dlq=True, retry=True, circuit_breaker=True) but communicates the PRO value proposition at the decoration site.

Parameters:

Name Type Description Default
name str

Identifier used for metrics/logging (passed to protect()).

required
fallback Callable[[], Any] | Callable[[], Awaitable[Any]] | None

Optional fallback callable invoked when all branches fail.

None
timeout float | None

Per-call timeout in seconds. Omit to use ProtectSettings defaults; pass None to disable.

_TIMEOUT_UNSET
context_from Callable[..., PolicyContext] | None | Literal[False]

Forwarded to @protected. Defaults (when omitted) to None → auto-extract: order_id / user_id and the full primitive payload from the wrapped function's arguments populate the DLQ entry without manual wiring. Pass False at privacy-sensitive callsites (e.g., @dlq_protect("auth.verify_password", context_from=False)) to skip capture. Pass a Callable[..., PolicyContext] for custom extraction logic.

_CONTEXT_FROM_UNSET

Returns:

Type Description
Callable[[Callable[..., T]], Callable[..., T]]

Decorator that auto-detects sync vs async and dispatches accordingly.

Usage::

@dlq_protect("orders.charge")
def charge(order_id: int) -> None:
    ...

@dlq_protect("orders.charge_async")
async def charge_async(order_id: int) -> None:
    ...

idempotent

idempotent(
    *,
    domain: (
        IdempotencyDomain | str
    ) = IdempotencyDomain.CUSTOM,
    key_args: list[str] | None = None,
    key_fn: (
        Callable[..., str | IdempotencyKey] | None
    ) = None,
    operation: str | None = None,
    ttl: timedelta | None = None,
    execution_ttl: timedelta | None = None
) -> Callable[[Callable[..., T]], Callable[..., T]]

Atomic do-not-run-twice decorator.

Parameters:

Name Type Description Default
domain IdempotencyDomain | str

IdempotencyDomain (or its string value) used to namespace the cache key. Defaults to IdempotencyDomain.CUSTOM.

CUSTOM
key_args list[str] | None

List of parameter names to extract from the wrapped function's signature. The cache key combines the domain, the per-operation component (see operation), and the extracted values — so two different operations sharing a domain and key values do not consume each other's dedup verdicts. Only primitive types are allowed (int/str/bool/UUID/etc.) — see _ALLOWED_KEY_ARG_TYPES. Mutually exclusive with key_fn.

None
key_fn Callable[..., str | IdempotencyKey] | None

(*args, **kwargs) -> str | IdempotencyKey callable that builds the key — the full-control escape hatch (no per-operation component is added). Mutually exclusive with key_args.

None
operation str | None

Per-operation key component for the key_args form. Defaults to the decorated function's module.qualname, so each function gets its own dedup key space with zero configuration. CAVEAT: the default identity is code-derived — renaming or moving the function resets that operation's dedup memory at the deploy. Set operation= explicitly for correctness-critical operations, and give two entry points the same label when they guard one logical operation (e.g. an HTTP handler and a worker both guarding the same charge). Must not contain : or | (the key separators). Not accepted with key_fn (the key is verbatim).

None
ttl timedelta | None

Dedup memory window — how long a completed (or failed) operation is remembered, i.e. how long duplicates stay blocked after success. None uses the gate memory default (BALDUR_IDEMPOTENCY_GATE_MEMORY_TTL_SECONDS, 30 minutes unless tuned).

None
execution_ttl timedelta | None

In-flight execution window — how long a running claim is honored before a crashed attempt becomes retryable (and before a competing process may take the key over). None uses the gate execution default (30 minutes). Set execution_ttl >= the worst-case runtime of the wrapped function; a value below it risks a concurrent duplicate run via stale takeover.

None

Returns:

Type Description
Callable[[Callable[..., T]], Callable[..., T]]

Decorator that auto-detects sync vs async.

Raises (at decoration time): TypeError: If both key_args and key_fn are supplied; or if an annotated key_args parameter is non-primitive; or if neither is supplied; or if operation is empty / contains a separator / is combined with key_fn; or if ttl / execution_ttl is not a positive timedelta.

Raises (at call time): IdempotencyDuplicateError: On SKIP (already completed) or ABORT (in-flight collision). IdempotencyUnavailableError: When the cache is unavailable (e.g. Redis down) during the check and fail_open_on_cache_error is False (the default fail-closed posture); opting in treats the unverifiable check as CONTINUE instead. TypeError: If an unannotated key_args value resolves to a non-primitive at runtime; or key_fn returns an unsupported type.

Usage::

@idempotent(domain=IdempotencyDomain.EXTERNAL_SERVICE, key_args=["order_id"])
def charge(order_id: int) -> None:
    ...

@idempotent(key_fn=lambda payload: payload["request_id"])
async def handle(payload: dict) -> None:
    ...

@idempotent(
    domain=IdempotencyDomain.EXTERNAL_SERVICE,
    key_args=["order_id"],
    operation="billing.charge",   # shared label across entry points
    ttl=timedelta(hours=2),       # remember completions for 2 h
    execution_ttl=timedelta(minutes=5),  # worst-case runtime bound
)
def charge_from_worker(order_id: int) -> None:
    ...

rate_limit

rate_limit(
    *,
    max_requests: int,
    window_seconds: int = 60,
    key_fn: Callable[..., str] | None = None,
    raise_on_limit: bool = True
) -> Callable[[Callable[..., T]], Callable[..., T]]

Function-level sliding-window rate limiter.

Parameters:

Name Type Description Default
max_requests int

Maximum requests allowed within window_seconds. Required — missing raises TypeError at decoration time.

required
window_seconds int

Sliding-window size in seconds. Defaults to 60.

60
key_fn Callable[..., str] | None

Optional (*args, **kwargs) -> str callable that derives the per-call bucket key. When None, a single shared bucket keyed by func.__qualname__ is used.

None
raise_on_limit bool

When True (default), raise RateLimitExceeded on rejection. When False, return None on rejection (the wrapped function's value is returned on allowed paths).

True

Returns:

Type Description
Callable[[Callable[..., T]], Callable[..., T]]

Decorator that auto-detects sync vs async.

Raises (at call time): RateLimitExceeded: When raise_on_limit=True and the limiter rejects the call.

Usage::

@rate_limit(max_requests=10, window_seconds=60)
def search(q: str) -> list[str]:
    ...

@rate_limit(max_requests=5, window_seconds=1, key_fn=lambda user_id: f"u:{user_id}")
async def fetch(user_id: int) -> dict:
    ...

domain_tag

domain_tag(domain: str) -> Callable[[F], F]

Domain-tagging decorator.

Sets the domain context while the wrapped function executes and restores the previous context on exit (both sync and async callables are supported).

Parameters:

Name Type Description Default
domain str

Domain name to apply for the duration of the call.

required

Returns:

Type Description
Callable[[F], F]

Decorator that wraps the target callable.

Usage

@domain_tag("payment") def process_payment(): # Errors raised here are tagged with the "payment" domain. ...

@domain_tag("order") async def create_order(): # Async functions are supported. ...

Reference

docs/baldur/middleware_system/75_CRISIS_BUDGET_MULTIPLIER.md §0.1 (item 6)

Context manager

DomainContext

DomainContext(domain: str)

Domain context manager.

Sets the domain for the enclosed code block via with. On block exit, the previous domain context is restored automatically.

Usage

with DomainContext("payment"): # The domain is "payment" inside this block. process_payment()

The previous domain is restored on exit.

Attributes:

Name Type Description
domain

Domain name to apply for the block.

Initialize a DomainContext.

Parameters:

Name Type Description Default
domain str

Domain name to apply.

required

Helpers

get_current_domain

get_current_domain() -> str | None

Return the current domain from the active context.

Returns:

Type Description
str | None

Current domain name, or None if no context is set.

Usage

@domain_tag("payment") def process_payment(): domain = get_current_domain() print(f"Current domain: {domain}") # "payment"

clear_domain_context

clear_domain_context() -> None

Clear the current domain context.

Primarily for test cleanup. In application code, DomainContext and @domain_tag clean up automatically on scope exit.

Exceptions raised by the decorators above

IdempotencyDuplicateError

IdempotencyDuplicateError(
    message: str = "",
    *,
    key: str = "",
    domain: str = "",
    decision: str = ""
)

Bases: BaldurError

Raised by @idempotent on a detected duplicate or in-flight collision.

Inherits BaldurError directly (correctness contract, not a resilience stage). Non-retryable by default — outer @dlq_protect layers should treat this as a terminal signal.

RateLimitExceeded

RateLimitExceeded(
    message: str = "",
    *,
    key: str = "",
    limit: int = 0,
    window_seconds: int = 0,
    reset_at: int = 0
)

Bases: ResilienceError

Raised by @rate_limit when a call is rejected by the limiter.

Function-level rejection signal — distinct from RateLimitStorageError (storage-backend failure).

DomainValidationError

DomainValidationError(
    message: str = "",
    *,
    original_domain: str = "",
    reason: Any = None
)

Bases: BaldurError

Raised when a domain input string fails validation.

Carries the original (pre-normalization) input and a typed reject reason for downstream logging / metric labelling.

Modeled on RecoveryAdapterError: raised at validation sites that have a loud failure mode (decoration-time, where a CI/dev surface can recover via test or rename). Runtime APIs catch this and fall back to FALLBACK_DOMAIN instead of propagating.