baldur.interfaces — Cache, Locking & Rate Limiting
The cache-provider interface with its distributed-lock primitive and lock exceptions, plus the rate-limit storage contract and the adaptive-throttle marker.
Cache & locking
CacheProviderInterface
Bases: ABC
Abstract interface for cache/state storage.
This interface abstracts cache operations including basic get/set, atomic counters, and distributed locking.
Implementations
- RedisCacheAdapter (current - Redis)
- InMemoryCacheAdapter (for testing)
- MemcachedCacheAdapter (planned)
- DynamoDBCacheAdapter (planned - AWS serverless)
Example
cache = ProviderRegistry.get_cache()
Basic operations
cache.set("key", "value", ttl=timedelta(minutes=5)) value = cache.get("key")
Atomic counter (for rate limiting)
count = cache.incr("request_count") if count == 1: ... cache.expire("request_count", timedelta(minutes=1))
Distributed locking
with cache.get_lock("payment:process") as lock: ... process_payment()
provider_name
abstractmethod
property
provider_name: str
Return the provider name.
Returns:
| Type | Description |
|---|---|
str
|
Provider identifier (e.g., 'redis', 'memcached', 'memory') |
get
abstractmethod
get(key: str) -> Any | None
Get value by key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
Cached value or None if not found/expired |
set
abstractmethod
set(
key: str, value: Any, ttl: timedelta | None = None
) -> bool
Set value with optional TTL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
value
|
Any
|
Value to cache (must be serializable) |
required |
ttl
|
timedelta | None
|
Time-to-live (None = no expiration) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
delete
abstractmethod
delete(key: str) -> bool
Delete key from cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key to delete |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if key existed and was deleted |
exists
abstractmethod
exists(key: str) -> bool
Check if key exists in cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if key exists and is not expired |
get_or_set
get_or_set(
key: str,
default_factory: Callable[[], Any],
ttl: timedelta | None = None,
*,
lock_timeout: timedelta = timedelta(seconds=10),
wait_timeout: float = 10.0
) -> Any
Get value or compute and cache it if missing, stampede-safe.
On a miss, concurrent callers coordinate so the factory runs once instead of once per caller (hot-key expiry no longer fans out N concurrent factory executions against the backend):
- In-process, the entire miss path is funneled through a per-adapter Singleflight: one thread per process enters the distributed dance; sibling threads share its result or exception.
- Across processes, a non-blocking distributed lock elects one winner (factory + set); losers poll the VALUE with a jittered ~0.1s cadence, retrying the non-blocking acquire so a crashed winner is replaced.
- Fail-open: a lock/cache backend failure at any phase, or
wait_timeoutexpiry, degrades to computing the value directly - bounded duplication, never an error or a blocked caller. The factory can therefore run more than once under winner crash/stall or backend failure; a factory whose side-effects must never execute twice has to guard itself (e.g., via IdempotencyGate).
None keeps its miss-sentinel semantics: a factory returning
None is stored as before, but reads of it count as misses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
default_factory
|
Callable[[], Any]
|
Callable to compute value if missing |
required |
ttl
|
timedelta | None
|
Time-to-live for new value |
None
|
lock_timeout
|
timedelta
|
Auto-release TTL of the per-key singleflight lock; bounds how long a crashed winner blocks takeover |
timedelta(seconds=10)
|
wait_timeout
|
float
|
Max seconds a loser waits for the winner's value before computing it anyway (match this to the factory's runtime budget) |
10.0
|
Returns:
| Type | Description |
|---|---|
Any
|
Cached or newly computed value |
incr
abstractmethod
incr(key: str, amount: int = 1) -> int
Atomically increment a counter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Counter key |
required |
amount
|
int
|
Increment amount (default 1) |
1
|
Returns:
| Type | Description |
|---|---|
int
|
New counter value after increment |
Note
Creates key with value 0 if not exists, then increments. This is an atomic operation - safe for concurrent access.
decr
abstractmethod
decr(key: str, amount: int = 1) -> int
Atomically decrement a counter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Counter key |
required |
amount
|
int
|
Decrement amount (default 1) |
1
|
Returns:
| Type | Description |
|---|---|
int
|
New counter value after decrement |
expire
abstractmethod
expire(key: str, ttl: timedelta) -> bool
Set expiration on existing key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
ttl
|
timedelta
|
Time-to-live duration |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if key exists and expiration was set |
ttl
abstractmethod
ttl(key: str) -> int | None
Get remaining TTL in seconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
Returns:
| Type | Description |
|---|---|
int | None
|
|
int | None
|
|
int | None
|
|
setnx
setnx(
key: str, value: Any, ttl: timedelta | None = None
) -> bool
Set value only if key does not exist (SET if Not eXists).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key |
required |
value
|
Any
|
Value to set |
required |
ttl
|
timedelta | None
|
Optional time-to-live |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if key was set (didn't exist), False otherwise |
cas_dict_field
cas_dict_field(
key: str,
field: str,
expected: Any,
new_value: dict[str, Any],
ttl: timedelta | None = None,
) -> bool
Atomic compare-and-set on a single field of a dict-valued record.
Reads the existing record at key; if it is a dict whose
field equals expected, replaces the entire record with
new_value (with optional TTL). Otherwise, returns False
without writing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Cache key holding a dict-valued record |
required |
field
|
str
|
Field name within the dict to check |
required |
expected
|
Any
|
Expected current value of |
required |
new_value
|
dict[str, Any]
|
Replacement record (full dict, not a partial update) |
required |
ttl
|
timedelta | None
|
Optional time-to-live for the replacement record |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the record matched and was replaced, False otherwise |
bool
|
(key missing, value not a dict, field mismatch). |
Note
The base implementation is a non-atomic get → check → set
two-step. Production adapters (Redis, Memory) override with
atomic implementations. IdempotencyGate validates that an
atomic override is in use via
_validate_atomic_cas_dict_field to prevent silent
inheritance of the non-atomic default.
get_lock
abstractmethod
get_lock(
name: str,
timeout: timedelta = timedelta(seconds=10),
blocking_timeout: float | None = None,
) -> DistributedLock
Get a distributed lock instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Lock name — the post-prefix portion of the storage
key. The adapter prepends its own prefix via
|
required |
timeout
|
timedelta
|
Lock auto-release timeout (prevents deadlocks) |
timedelta(seconds=10)
|
blocking_timeout
|
float | None
|
Max time to wait when acquiring |
None
|
Returns:
| Type | Description |
|---|---|
DistributedLock
|
DistributedLock instance |
Example
with cache.get_lock("circuit_breaker:payment") as lock: ... # Critical section - only one process executes this ... transition_circuit_breaker_state()
Prefix semantics
cache.get_lock("foo") on a Redis adapter with default
settings writes the Redis key baldur:foo (single prefix
applied by _make_key). Inside TestModeContext.start()
it shifts to xtest:baldur:foo (Redis-only).
On Memcached / InMemory the configured static key_prefix
is honored without TestModeContext awareness.
For SCAN observability of lock keys, callers should embed a
lock: segment in the name (e.g.,
cache.get_lock("idempotency:lock:order:abc")).
Note
Always use locks with context manager to ensure release. The timeout parameter prevents deadlocks if a process crashes while holding the lock.
mget
abstractmethod
mget(keys: list[str]) -> dict[str, Any]
Get multiple values at once.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
List of cache keys |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict mapping keys to values (missing keys omitted) |
mset
abstractmethod
mset(
mapping: dict[str, Any], ttl: timedelta | None = None
) -> bool
Set multiple values at once.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
dict[str, Any]
|
Key-value pairs to set |
required |
ttl
|
timedelta | None
|
Optional TTL for all keys |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
mdelete
mdelete(keys: list[str]) -> int
Delete multiple keys at once.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
List of cache keys to delete |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of keys that were deleted |
hget
hget(name: str, key: str) -> Any | None
Get a field from a hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Hash name |
required |
key
|
str
|
Field key within the hash |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
Field value or None |
hset
hset(name: str, key: str, value: Any) -> bool
Set a field in a hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Hash name |
required |
key
|
str
|
Field key within the hash |
required |
value
|
Any
|
Field value |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
hgetall
hgetall(name: str) -> dict[str, Any]
Get all fields from a hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Hash name |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict of all fields and values |
push_limit
push_limit(
key: str,
value: Any,
max_len: int,
ttl: timedelta | None = None,
) -> int
Append value to a list and trim to max_len (oldest entries dropped).
Default implementation uses RMW via get()/set(). Redis and InMemory adapters override with native atomic operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
List key |
required |
value
|
Any
|
Value to append (must be serializable) |
required |
max_len
|
int
|
Maximum list length (oldest entries trimmed) |
required |
ttl
|
timedelta | None
|
Time-to-live for the key (renewed on each call) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Pre-trim length (length after push, before trim). |
int
|
|
list_range
list_range(key: str, start: int, end: int) -> list[Any]
Return elements from start to end (inclusive) of a list.
Default implementation uses get() + slice. Redis and InMemory adapters override with native operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
List key |
required |
start
|
int
|
Start index (0-based, negative supported) |
required |
end
|
int
|
End index (inclusive, -1 means last element) |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of elements in the specified range |
health_check
abstractmethod
health_check() -> bool
Check if cache backend is reachable.
Returns:
| Type | Description |
|---|---|
bool
|
True if healthy and connected |
flush_all
abstractmethod
flush_all() -> bool
Clear all keys (USE WITH CAUTION - mainly for testing).
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Warning
This will delete ALL data in the cache. Only use in testing environments or with explicit confirmation.
ping
ping() -> bool
Simple connectivity check.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection is alive |
keys
keys(pattern: str = '*') -> list[str]
Find keys matching a pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob-style pattern (e.g., "circuit_breaker:*") |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of matching keys |
Warning
Use with caution in production - may be slow with many keys. Default implementation returns empty list.
scan
scan(
pattern: str = "*", count: int = 100
) -> tuple[int, list[str]]
Incrementally iterate keys matching a pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob-style pattern |
'*'
|
count
|
int
|
Approximate number of keys per iteration |
100
|
Returns:
| Type | Description |
|---|---|
tuple[int, list[str]]
|
Tuple of (cursor, keys) - cursor 0 means scan complete |
Note
Default implementation returns (0, []). Override for implementations that support scanning.
DistributedLock
Bases: ABC
Distributed lock interface for cross-process synchronization.
Used by CircuitBreaker for state transitions and other critical sections that require mutual exclusion across multiple processes or servers.
Supports context manager protocol for safe usage:
with cache.get_lock("circuit_breaker:payment") as lock:
# Critical section - only one process can execute
circuit_breaker.transition_state()
Implementations
- RedisDistributedLock (Redis-based)
- InMemoryLock (for testing - single process only)
Storage-key contract
Lock implementations MUST treat the constructor's full_key
argument as the verbatim storage key. The owning adapter
(cache.get_lock(name)) is the single point that resolves the
user-supplied lock name into a full key by routing it through its
own _make_key() (which honors key_prefix,
TestModeContext, and NamespaceSettings — Redis
only). The lock writes full_key directly to storage.
Anti-pattern: lock implementations MUST NOT hardcode any
prefix segment (e.g., f"lock:{name}") inside __init__.
Earlier RedisDistributedLock versions did exactly this, producing
double-prefixed keys (baldur:lock:idempotency:lock:order:abc)
and bypassing the adapter's prefix system entirely. New adapter
implementations (DynamoDB / etcd / ZooKeeper) must follow the
contract: full key in, full key written, no in-class
transformation.
SCAN-observability convention: callers that want a recognizable
lock: segment in storage should embed it in the name passed
to cache.get_lock() (e.g., cache.get_lock("idempotency:lock:order:abc"))
rather than relying on the lock class to add it.
Lifecycle: lock construction, acquire, and release should
occur within a single TestModeContext scope. The natural
with cache.get_lock(name) as lock: pattern enforces this.
Crossing a context boundary between construction and acquire
leaves the lock in the construction-time namespace — safe (no
orphan) but breaks synthetic isolation for that one lock.
acquire
abstractmethod
acquire(
blocking: bool = True, timeout: float | None = None
) -> bool
Acquire the lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
blocking
|
bool
|
If True, block until lock is acquired |
True
|
timeout
|
float | None
|
Max seconds to wait (None = infinite) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False otherwise |
Note
If blocking=False and lock is held, returns False immediately. If blocking=True and timeout expires, returns False.
release
abstractmethod
release() -> None
locked
abstractmethod
locked() -> bool
Check if lock is currently held by anyone.
Returns:
| Type | Description |
|---|---|
bool
|
True if lock is held, False if available |
owned
abstractmethod
owned() -> bool
Check if lock is held by current owner.
Returns:
| Type | Description |
|---|---|
bool
|
True if lock is held by this instance |
extend
extend(additional_time: timedelta) -> bool
Extend the lock's TTL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
additional_time
|
timedelta
|
Time to add to current TTL |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if extension was successful |
Note
Default implementation returns False (not supported). Override in implementations that support TTL extension.
LockAcquisitionError
LockAcquisitionError(message: str = '', *, code: str = '')
LockNotOwnedError
LockNotOwnedError(message: str = '', *, code: str = '')
generate_lock_owner_id
generate_lock_owner_id() -> str
Standard lock owner ID for all DistributedLock implementations.
Rate limiting & throttle
RateLimitStorageType
Bases: str, Enum
Type of rate limit storage backend.
RateLimitState
dataclass
RateLimitState(
key: str,
cooldown_until: float = 0.0,
consecutive_429s: int = 0,
last_updated: float = 0.0,
)
Rate limit state for a specific endpoint/service.
Attributes:
| Name | Type | Description |
|---|---|---|
key |
str
|
Unique identifier (e.g., "payment_api", "external_service") |
cooldown_until |
float
|
Unix timestamp when cooldown ends (0 = no cooldown) |
consecutive_429s |
int
|
Number of consecutive 429 responses |
last_updated |
float
|
Unix timestamp of last state update |
is_in_cooldown
property
is_in_cooldown: bool
Check if currently in cooldown period.
remaining_cooldown
property
remaining_cooldown: float
Get remaining cooldown time in seconds.
RateLimitStorageInterface
Bases: ABC
Abstract interface for distributed rate limit state storage.
Implementations must be thread-safe and support atomic operations.
Usage
storage = get_rate_limit_storage()
On 429 response
storage.set_cooldown("payment_api", cooldown_until=time.time() + 60) storage.increment_consecutive_429s("payment_api")
Before making request
state = storage.get_state("payment_api") if state.is_in_cooldown: time.sleep(state.remaining_cooldown)
Implementations
- RedisRateLimitStorage (fastest, requires Redis)
- DatabaseRateLimitStorage (100% compatible, slightly slower)
- InMemoryRateLimitStorage (single process only, for testing)
storage_type
abstractmethod
property
storage_type: RateLimitStorageType
Return the type of storage backend.
get_state
abstractmethod
get_state(key: str) -> RateLimitState
Get the current rate limit state for a key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier for the rate-limited resource |
required |
Returns:
| Type | Description |
|---|---|
RateLimitState
|
RateLimitState with current cooldown info |
Note
Returns a default state (no cooldown) if key doesn't exist.
set_cooldown
abstractmethod
set_cooldown(
key: str, cooldown_until: float, ttl: int | None = None
) -> None
Set the cooldown end time for a key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier for the rate-limited resource |
required |
cooldown_until
|
float
|
Unix timestamp when cooldown should end |
required |
ttl
|
int | None
|
Time-to-live in seconds (for cleanup) |
None
|
Note
This should be an atomic operation to prevent race conditions.
increment_consecutive_429s
abstractmethod
increment_consecutive_429s(key: str) -> int
Atomically increment the consecutive 429 counter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier for the rate-limited resource |
required |
Returns:
| Type | Description |
|---|---|
int
|
New counter value after increment |
Note
Used for exponential backoff calculation.
reset_consecutive_429s
abstractmethod
reset_consecutive_429s(key: str) -> None
Reset the consecutive 429 counter on successful request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier for the rate-limited resource |
required |
clear
abstractmethod
clear(key: str) -> None
Clear all rate limit state for a key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier for the rate-limited resource |
required |
is_available
is_available() -> bool
Check if the storage backend is available.
Returns:
| Type | Description |
|---|---|
bool
|
True if the storage is operational |
Note
Used for fallback detection. Default returns True.
RateLimitStorageError
RateLimitStorageError(message: str = '', *, code: str = '')
RateLimitStorageUnavailableError
RateLimitStorageUnavailableError(
message: str = "", *, code: str = ""
)
AdaptiveThrottle
Bases: Protocol
Protocol for the PRO adaptive throttle singleton.