Skip to content

baldur.interfaces — Cache, Locking & Rate Limiting

The cache-provider interface with its distributed-lock primitive and lock exceptions, plus the rate-limit storage contract and the adaptive-throttle marker.

Cache & locking

CacheProviderInterface

Bases: ABC

Abstract interface for cache/state storage.

This interface abstracts cache operations including basic get/set, atomic counters, and distributed locking.

Implementations
  • RedisCacheAdapter (current - Redis)
  • InMemoryCacheAdapter (for testing)
  • MemcachedCacheAdapter (planned)
  • DynamoDBCacheAdapter (planned - AWS serverless)
Example

cache = ProviderRegistry.get_cache()

Basic operations

cache.set("key", "value", ttl=timedelta(minutes=5)) value = cache.get("key")

Atomic counter (for rate limiting)

count = cache.incr("request_count") if count == 1: ... cache.expire("request_count", timedelta(minutes=1))

Distributed locking

with cache.get_lock("payment:process") as lock: ... process_payment()

provider_name abstractmethod property

provider_name: str

Return the provider name.

Returns:

Type Description
str

Provider identifier (e.g., 'redis', 'memcached', 'memory')

get abstractmethod

get(key: str) -> Any | None

Get value by key.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Any | None

Cached value or None if not found/expired

set abstractmethod

set(
    key: str, value: Any, ttl: timedelta | None = None
) -> bool

Set value with optional TTL.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache (must be serializable)

required
ttl timedelta | None

Time-to-live (None = no expiration)

None

Returns:

Type Description
bool

True if successful

delete abstractmethod

delete(key: str) -> bool

Delete key from cache.

Parameters:

Name Type Description Default
key str

Cache key to delete

required

Returns:

Type Description
bool

True if key existed and was deleted

exists abstractmethod

exists(key: str) -> bool

Check if key exists in cache.

Parameters:

Name Type Description Default
key str

Cache key to check

required

Returns:

Type Description
bool

True if key exists and is not expired

get_or_set

get_or_set(
    key: str,
    default_factory: Callable[[], Any],
    ttl: timedelta | None = None,
    *,
    lock_timeout: timedelta = timedelta(seconds=10),
    wait_timeout: float = 10.0
) -> Any

Get value or compute and cache it if missing, stampede-safe.

On a miss, concurrent callers coordinate so the factory runs once instead of once per caller (hot-key expiry no longer fans out N concurrent factory executions against the backend):

  • In-process, the entire miss path is funneled through a per-adapter Singleflight: one thread per process enters the distributed dance; sibling threads share its result or exception.
  • Across processes, a non-blocking distributed lock elects one winner (factory + set); losers poll the VALUE with a jittered ~0.1s cadence, retrying the non-blocking acquire so a crashed winner is replaced.
  • Fail-open: a lock/cache backend failure at any phase, or wait_timeout expiry, degrades to computing the value directly - bounded duplication, never an error or a blocked caller. The factory can therefore run more than once under winner crash/stall or backend failure; a factory whose side-effects must never execute twice has to guard itself (e.g., via IdempotencyGate).

None keeps its miss-sentinel semantics: a factory returning None is stored as before, but reads of it count as misses.

Parameters:

Name Type Description Default
key str

Cache key

required
default_factory Callable[[], Any]

Callable to compute value if missing

required
ttl timedelta | None

Time-to-live for new value

None
lock_timeout timedelta

Auto-release TTL of the per-key singleflight lock; bounds how long a crashed winner blocks takeover

timedelta(seconds=10)
wait_timeout float

Max seconds a loser waits for the winner's value before computing it anyway (match this to the factory's runtime budget)

10.0

Returns:

Type Description
Any

Cached or newly computed value

incr abstractmethod

incr(key: str, amount: int = 1) -> int

Atomically increment a counter.

Parameters:

Name Type Description Default
key str

Counter key

required
amount int

Increment amount (default 1)

1

Returns:

Type Description
int

New counter value after increment

Note

Creates key with value 0 if not exists, then increments. This is an atomic operation - safe for concurrent access.

decr abstractmethod

decr(key: str, amount: int = 1) -> int

Atomically decrement a counter.

Parameters:

Name Type Description Default
key str

Counter key

required
amount int

Decrement amount (default 1)

1

Returns:

Type Description
int

New counter value after decrement

expire abstractmethod

expire(key: str, ttl: timedelta) -> bool

Set expiration on existing key.

Parameters:

Name Type Description Default
key str

Cache key

required
ttl timedelta

Time-to-live duration

required

Returns:

Type Description
bool

True if key exists and expiration was set

ttl abstractmethod

ttl(key: str) -> int | None

Get remaining TTL in seconds.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
int | None
  • Positive int: seconds until expiration
int | None
  • None: key has no expiration
int | None
  • -2: key does not exist

setnx

setnx(
    key: str, value: Any, ttl: timedelta | None = None
) -> bool

Set value only if key does not exist (SET if Not eXists).

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to set

required
ttl timedelta | None

Optional time-to-live

None

Returns:

Type Description
bool

True if key was set (didn't exist), False otherwise

cas_dict_field

cas_dict_field(
    key: str,
    field: str,
    expected: Any,
    new_value: dict[str, Any],
    ttl: timedelta | None = None,
) -> bool

Atomic compare-and-set on a single field of a dict-valued record.

Reads the existing record at key; if it is a dict whose field equals expected, replaces the entire record with new_value (with optional TTL). Otherwise, returns False without writing.

Parameters:

Name Type Description Default
key str

Cache key holding a dict-valued record

required
field str

Field name within the dict to check

required
expected Any

Expected current value of field

required
new_value dict[str, Any]

Replacement record (full dict, not a partial update)

required
ttl timedelta | None

Optional time-to-live for the replacement record

None

Returns:

Type Description
bool

True if the record matched and was replaced, False otherwise

bool

(key missing, value not a dict, field mismatch).

Note

The base implementation is a non-atomic get → check → set two-step. Production adapters (Redis, Memory) override with atomic implementations. IdempotencyGate validates that an atomic override is in use via _validate_atomic_cas_dict_field to prevent silent inheritance of the non-atomic default.

get_lock abstractmethod

get_lock(
    name: str,
    timeout: timedelta = timedelta(seconds=10),
    blocking_timeout: float | None = None,
) -> DistributedLock

Get a distributed lock instance.

Parameters:

Name Type Description Default
name str

Lock name — the post-prefix portion of the storage key. The adapter prepends its own prefix via _make_key(name) before constructing the lock; the returned lock writes that fully resolved key verbatim. See DistributedLock Storage-key contract for the rationale and anti-pattern.

required
timeout timedelta

Lock auto-release timeout (prevents deadlocks)

timedelta(seconds=10)
blocking_timeout float | None

Max time to wait when acquiring

None

Returns:

Type Description
DistributedLock

DistributedLock instance

Example

with cache.get_lock("circuit_breaker:payment") as lock: ... # Critical section - only one process executes this ... transition_circuit_breaker_state()

Prefix semantics

cache.get_lock("foo") on a Redis adapter with default settings writes the Redis key baldur:foo (single prefix applied by _make_key). Inside TestModeContext.start() it shifts to xtest:baldur:foo (Redis-only). On Memcached / InMemory the configured static key_prefix is honored without TestModeContext awareness.

For SCAN observability of lock keys, callers should embed a lock: segment in the name (e.g., cache.get_lock("idempotency:lock:order:abc")).

Note

Always use locks with context manager to ensure release. The timeout parameter prevents deadlocks if a process crashes while holding the lock.

mget abstractmethod

mget(keys: list[str]) -> dict[str, Any]

Get multiple values at once.

Parameters:

Name Type Description Default
keys list[str]

List of cache keys

required

Returns:

Type Description
dict[str, Any]

Dict mapping keys to values (missing keys omitted)

mset abstractmethod

mset(
    mapping: dict[str, Any], ttl: timedelta | None = None
) -> bool

Set multiple values at once.

Parameters:

Name Type Description Default
mapping dict[str, Any]

Key-value pairs to set

required
ttl timedelta | None

Optional TTL for all keys

None

Returns:

Type Description
bool

True if successful

mdelete

mdelete(keys: list[str]) -> int

Delete multiple keys at once.

Parameters:

Name Type Description Default
keys list[str]

List of cache keys to delete

required

Returns:

Type Description
int

Number of keys that were deleted

hget

hget(name: str, key: str) -> Any | None

Get a field from a hash.

Parameters:

Name Type Description Default
name str

Hash name

required
key str

Field key within the hash

required

Returns:

Type Description
Any | None

Field value or None

hset

hset(name: str, key: str, value: Any) -> bool

Set a field in a hash.

Parameters:

Name Type Description Default
name str

Hash name

required
key str

Field key within the hash

required
value Any

Field value

required

Returns:

Type Description
bool

True if successful

hgetall

hgetall(name: str) -> dict[str, Any]

Get all fields from a hash.

Parameters:

Name Type Description Default
name str

Hash name

required

Returns:

Type Description
dict[str, Any]

Dict of all fields and values

push_limit

push_limit(
    key: str,
    value: Any,
    max_len: int,
    ttl: timedelta | None = None,
) -> int

Append value to a list and trim to max_len (oldest entries dropped).

Default implementation uses RMW via get()/set(). Redis and InMemory adapters override with native atomic operations.

Parameters:

Name Type Description Default
key str

List key

required
value Any

Value to append (must be serializable)

required
max_len int

Maximum list length (oldest entries trimmed)

required
ttl timedelta | None

Time-to-live for the key (renewed on each call)

None

Returns:

Type Description
int

Pre-trim length (length after push, before trim).

int

max_len means trim occurred.

list_range

list_range(key: str, start: int, end: int) -> list[Any]

Return elements from start to end (inclusive) of a list.

Default implementation uses get() + slice. Redis and InMemory adapters override with native operations.

Parameters:

Name Type Description Default
key str

List key

required
start int

Start index (0-based, negative supported)

required
end int

End index (inclusive, -1 means last element)

required

Returns:

Type Description
list[Any]

List of elements in the specified range

health_check abstractmethod

health_check() -> bool

Check if cache backend is reachable.

Returns:

Type Description
bool

True if healthy and connected

flush_all abstractmethod

flush_all() -> bool

Clear all keys (USE WITH CAUTION - mainly for testing).

Returns:

Type Description
bool

True if successful

Warning

This will delete ALL data in the cache. Only use in testing environments or with explicit confirmation.

ping

ping() -> bool

Simple connectivity check.

Returns:

Type Description
bool

True if connection is alive

keys

keys(pattern: str = '*') -> list[str]

Find keys matching a pattern.

Parameters:

Name Type Description Default
pattern str

Glob-style pattern (e.g., "circuit_breaker:*")

'*'

Returns:

Type Description
list[str]

List of matching keys

Warning

Use with caution in production - may be slow with many keys. Default implementation returns empty list.

scan

scan(
    pattern: str = "*", count: int = 100
) -> tuple[int, list[str]]

Incrementally iterate keys matching a pattern.

Parameters:

Name Type Description Default
pattern str

Glob-style pattern

'*'
count int

Approximate number of keys per iteration

100

Returns:

Type Description
tuple[int, list[str]]

Tuple of (cursor, keys) - cursor 0 means scan complete

Note

Default implementation returns (0, []). Override for implementations that support scanning.

DistributedLock

Bases: ABC

Distributed lock interface for cross-process synchronization.

Used by CircuitBreaker for state transitions and other critical sections that require mutual exclusion across multiple processes or servers.

Supports context manager protocol for safe usage:

with cache.get_lock("circuit_breaker:payment") as lock:
    # Critical section - only one process can execute
    circuit_breaker.transition_state()
Implementations
  • RedisDistributedLock (Redis-based)
  • InMemoryLock (for testing - single process only)
Storage-key contract

Lock implementations MUST treat the constructor's full_key argument as the verbatim storage key. The owning adapter (cache.get_lock(name)) is the single point that resolves the user-supplied lock name into a full key by routing it through its own _make_key() (which honors key_prefix, TestModeContext, and NamespaceSettings — Redis only). The lock writes full_key directly to storage.

Anti-pattern: lock implementations MUST NOT hardcode any prefix segment (e.g., f"lock:{name}") inside __init__. Earlier RedisDistributedLock versions did exactly this, producing double-prefixed keys (baldur:lock:idempotency:lock:order:abc) and bypassing the adapter's prefix system entirely. New adapter implementations (DynamoDB / etcd / ZooKeeper) must follow the contract: full key in, full key written, no in-class transformation.

SCAN-observability convention: callers that want a recognizable lock: segment in storage should embed it in the name passed to cache.get_lock() (e.g., cache.get_lock("idempotency:lock:order:abc")) rather than relying on the lock class to add it.

Lifecycle: lock construction, acquire, and release should occur within a single TestModeContext scope. The natural with cache.get_lock(name) as lock: pattern enforces this. Crossing a context boundary between construction and acquire leaves the lock in the construction-time namespace — safe (no orphan) but breaks synthetic isolation for that one lock.

acquire abstractmethod

acquire(
    blocking: bool = True, timeout: float | None = None
) -> bool

Acquire the lock.

Parameters:

Name Type Description Default
blocking bool

If True, block until lock is acquired

True
timeout float | None

Max seconds to wait (None = infinite)

None

Returns:

Type Description
bool

True if lock was acquired, False otherwise

Note

If blocking=False and lock is held, returns False immediately. If blocking=True and timeout expires, returns False.

release abstractmethod

release() -> None

Release the lock.

Raises:

Type Description
LockNotOwnedError

If lock is not held by current owner

locked abstractmethod

locked() -> bool

Check if lock is currently held by anyone.

Returns:

Type Description
bool

True if lock is held, False if available

owned abstractmethod

owned() -> bool

Check if lock is held by current owner.

Returns:

Type Description
bool

True if lock is held by this instance

extend

extend(additional_time: timedelta) -> bool

Extend the lock's TTL.

Parameters:

Name Type Description Default
additional_time timedelta

Time to add to current TTL

required

Returns:

Type Description
bool

True if extension was successful

Note

Default implementation returns False (not supported). Override in implementations that support TTL extension.

LockAcquisitionError

LockAcquisitionError(message: str = '', *, code: str = '')

Bases: BaldurError

Raised when lock acquisition fails.

LockNotOwnedError

LockNotOwnedError(message: str = '', *, code: str = '')

Bases: BaldurError

Raised when trying to release a lock not owned by current instance.

generate_lock_owner_id

generate_lock_owner_id() -> str

Standard lock owner ID for all DistributedLock implementations.

Rate limiting & throttle

RateLimitStorageType

Bases: str, Enum

Type of rate limit storage backend.

RateLimitState dataclass

RateLimitState(
    key: str,
    cooldown_until: float = 0.0,
    consecutive_429s: int = 0,
    last_updated: float = 0.0,
)

Rate limit state for a specific endpoint/service.

Attributes:

Name Type Description
key str

Unique identifier (e.g., "payment_api", "external_service")

cooldown_until float

Unix timestamp when cooldown ends (0 = no cooldown)

consecutive_429s int

Number of consecutive 429 responses

last_updated float

Unix timestamp of last state update

is_in_cooldown property

is_in_cooldown: bool

Check if currently in cooldown period.

remaining_cooldown property

remaining_cooldown: float

Get remaining cooldown time in seconds.

RateLimitStorageInterface

Bases: ABC

Abstract interface for distributed rate limit state storage.

Implementations must be thread-safe and support atomic operations.

Usage

storage = get_rate_limit_storage()

On 429 response

storage.set_cooldown("payment_api", cooldown_until=time.time() + 60) storage.increment_consecutive_429s("payment_api")

Before making request

state = storage.get_state("payment_api") if state.is_in_cooldown: time.sleep(state.remaining_cooldown)

Implementations
  • RedisRateLimitStorage (fastest, requires Redis)
  • DatabaseRateLimitStorage (100% compatible, slightly slower)
  • InMemoryRateLimitStorage (single process only, for testing)

storage_type abstractmethod property

storage_type: RateLimitStorageType

Return the type of storage backend.

get_state abstractmethod

get_state(key: str) -> RateLimitState

Get the current rate limit state for a key.

Parameters:

Name Type Description Default
key str

Unique identifier for the rate-limited resource

required

Returns:

Type Description
RateLimitState

RateLimitState with current cooldown info

Note

Returns a default state (no cooldown) if key doesn't exist.

set_cooldown abstractmethod

set_cooldown(
    key: str, cooldown_until: float, ttl: int | None = None
) -> None

Set the cooldown end time for a key.

Parameters:

Name Type Description Default
key str

Unique identifier for the rate-limited resource

required
cooldown_until float

Unix timestamp when cooldown should end

required
ttl int | None

Time-to-live in seconds (for cleanup)

None
Note

This should be an atomic operation to prevent race conditions.

increment_consecutive_429s abstractmethod

increment_consecutive_429s(key: str) -> int

Atomically increment the consecutive 429 counter.

Parameters:

Name Type Description Default
key str

Unique identifier for the rate-limited resource

required

Returns:

Type Description
int

New counter value after increment

Note

Used for exponential backoff calculation.

reset_consecutive_429s abstractmethod

reset_consecutive_429s(key: str) -> None

Reset the consecutive 429 counter on successful request.

Parameters:

Name Type Description Default
key str

Unique identifier for the rate-limited resource

required

clear abstractmethod

clear(key: str) -> None

Clear all rate limit state for a key.

Parameters:

Name Type Description Default
key str

Unique identifier for the rate-limited resource

required

is_available

is_available() -> bool

Check if the storage backend is available.

Returns:

Type Description
bool

True if the storage is operational

Note

Used for fallback detection. Default returns True.

RateLimitStorageError

RateLimitStorageError(message: str = '', *, code: str = '')

Bases: AdapterError

Base exception for rate limit storage errors.

RateLimitStorageUnavailableError

RateLimitStorageUnavailableError(
    message: str = "", *, code: str = ""
)

Bases: RateLimitStorageError

Raised when storage backend is unavailable.

AdaptiveThrottle

Bases: Protocol

Protocol for the PRO adaptive throttle singleton.