Skip to content

baldur.services — Idempotency

The idempotency service and its key/domain types, used to deduplicate at-least-once operations.

IdempotencyService

IdempotencyService(
    cache_ttl: int | None = None,
    time_provider: TimeProvider | None = None,
    clock_skew_tolerance_seconds: float | None = None,
)

Service for checking and managing idempotency of operations.

Provides both cache-based (fast) and database-based (reliable) idempotency checking.

For framework-agnostic usage, provide lookup callbacks when calling check().

Example

Framework-agnostic usage with TimeProvider

from baldur.core.time_provider import MockTimeProvider

service = IdempotencyService(time_provider=MockTimeProvider()) key = IdempotencyKey.for_operation("order", 123, "process") result = service.check(key, lookup_fn=my_lookup)

Initialize the idempotency service.

Parameters:

Name Type Description Default
cache_ttl int | None

Custom cache TTL in seconds

None
time_provider TimeProvider | None

TimeProvider for testable time operations

None
clock_skew_tolerance_seconds float | None

Clock skew tolerance for distributed checks

None

DEFAULT_CACHE_TTL property

DEFAULT_CACHE_TTL: int

Default TTL for cache-based idempotency.

EXTENDED_CACHE_TTL property

EXTENDED_CACHE_TTL: int

Extended TTL for operations requiring longer TTL.

clock_skew_tolerance property

clock_skew_tolerance: float

Clock skew tolerance in seconds for distributed checks.

time_provider property

time_provider: TimeProvider

Get the time provider for this service.

now

now() -> datetime

Get current time using the configured TimeProvider.

Returns:

Type Description
datetime

Current datetime from time provider

check

check(
    key: IdempotencyKey,
    lookup_fn: Callable[..., Any] | None = None,
    cache_ttl: int | None = None,
) -> IdempotencyResult

Check if an operation has already been processed.

Parameters:

Name Type Description Default
key IdempotencyKey

The idempotency key to check

required
lookup_fn Callable[..., Any] | None

Optional callback to check database

None
cache_ttl int | None

Optional custom TTL for cache

None

Returns:

Type Description
IdempotencyResult

IdempotencyResult with duplicate status

Note

Gracefully degrades to DB-only check if Redis is unavailable. When BALDUR_IDEMPOTENCY_ENABLED=false, returns IdempotencyResult(is_duplicate=False, message="idempotency disabled") without consulting cache or database.

mark_as_processed

mark_as_processed(
    key: IdempotencyKey,
    record_id: int | None = None,
    ttl: int | None = None,
) -> bool

Mark an operation as processed in the cache.

Call this after successfully completing an operation.

Parameters:

Name Type Description Default
key IdempotencyKey

The idempotency key

required
record_id int | None

Optional record ID to cache

None
ttl int | None

Optional custom TTL

None

Returns:

Type Description
bool

True if cache was updated, False if cache was unavailable.

bool

The operation is still considered successful even if cache fails,

bool

as the DB is the source of truth. When idempotency is globally

bool

disabled, returns True without writing to cache.

batch_check

batch_check(
    keys: list[IdempotencyKey],
) -> list[IdempotencyResult]

Batch idempotency check using cache get_many (Redis MGET).

Parameters:

Name Type Description Default
keys list[IdempotencyKey]

List of idempotency keys to check

required

Returns:

Type Description
list[IdempotencyResult]

List of IdempotencyResult in the same order as input keys.

list[IdempotencyResult]

When idempotency is globally disabled, returns all

list[IdempotencyResult]

is_duplicate=False results without consulting cache.

batch_mark_as_processed

batch_mark_as_processed(
    keys: list[IdempotencyKey], ttl: int | None = None
) -> bool

Batch mark operations as processed using cache set_many (Redis Pipeline SET).

Parameters:

Name Type Description Default
keys list[IdempotencyKey]

List of idempotency keys to mark

required
ttl int | None

Optional custom TTL

None

Returns:

Type Description
bool

True if cache was updated, False if cache was unavailable.

bool

When idempotency is globally disabled, returns True without

bool

writing to cache.

acquire_lock

acquire_lock(key: IdempotencyKey, ttl_seconds: int) -> bool

Acquire a distributed lock for the given idempotency key.

Delegates to CacheProviderInterface.get_lock() → DistributedLock. When no cache adapter is registered, the resolver returns an in-process InMemoryCacheAdapter whose get_lock is single-process only — sufficient for single-worker installs; multi-worker deployments must register Redis (enforced at init time).

Parameters:

Name Type Description Default
key IdempotencyKey

Idempotency key identifying the lock scope

required
ttl_seconds int

Lock auto-expiry in seconds (prevents deadlock on crash)

required

Returns:

Type Description
bool

True if lock acquired, False if already held by another process.

release_lock

release_lock(key: IdempotencyKey) -> bool

Release a previously acquired distributed lock.

Best-effort: if release fails (network issue), TTL auto-expires the lock.

Parameters:

Name Type Description Default
key IdempotencyKey

Idempotency key identifying the lock scope

required

Returns:

Type Description
bool

True if lock was found and release attempted, False if not held.

clear

clear(key: IdempotencyKey) -> bool

Clear an idempotency key from cache.

Use with caution - only for cleanup or testing.

Parameters:

Name Type Description Default
key IdempotencyKey

The idempotency key to clear

required

Returns:

Type Description
bool

True if cache was cleared, False if cache was unavailable.

IdempotencyKey dataclass

IdempotencyKey(
    domain: IdempotencyDomain,
    key: str,
    components: dict[str, Any],
)

Represents an idempotency key with domain context.

The key is a combination of domain-specific identifiers that uniquely identify an operation.

cache_key property

cache_key: str

Get the cache key for Redis/memcached storage.

hash property

hash: str

Get a hash of the key for indexing.

for_operation classmethod

for_operation(
    entity_type: str,
    entity_id: int | str,
    operation: str,
    domain: IdempotencyDomain = IdempotencyDomain.EXTERNAL_SERVICE,
) -> IdempotencyKey

Create an idempotency key for a generic operation.

Parameters:

Name Type Description Default
entity_type str

Type of entity (e.g., "order", "user", "product")

required
entity_id int | str

The entity ID (int PKs or string UUIDs/correlation IDs).

required
operation str

The operation being performed (e.g., "process", "update")

required
domain IdempotencyDomain

The domain category

EXTERNAL_SERVICE

Returns:

Type Description
IdempotencyKey

IdempotencyKey for the operation

for_event classmethod

for_event(event_id: str) -> IdempotencyKey

Create an idempotency key for event processing.

Parameters:

Name Type Description Default
event_id str

The unique event ID

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for the event

for_resource_action classmethod

for_resource_action(
    resource_type: str,
    resource_id: int,
    action: str,
    amount: int | None = None,
) -> IdempotencyKey

Create an idempotency key for resource actions.

Parameters:

Name Type Description Default
resource_type str

Type of resource

required
resource_id int

The resource ID

required
action str

The action being performed

required
amount int | None

Optional amount for the action

None

Returns:

Type Description
IdempotencyKey

IdempotencyKey for the resource action

custom classmethod

custom(key: str, **components: Any) -> IdempotencyKey

Create a custom idempotency key.

Parameters:

Name Type Description Default
key str

The raw key string

required
**components Any

Key components for debugging

{}

Returns:

Type Description
IdempotencyKey

IdempotencyKey with custom domain

for_chaos_experiment classmethod

for_chaos_experiment(
    schedule_id: str,
    experiment_type: str,
    target_service: str,
) -> IdempotencyKey

Create an idempotency key for a Chaos experiment.

Prevents experiments of the same schedule from running concurrently.

Parameters:

Name Type Description Default
schedule_id str

Schedule ID

required
experiment_type str

Experiment type (e.g., latency_injection, fault_injection)

required
target_service str

Target service

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for chaos experiment

for_chaos_service_lock classmethod

for_chaos_service_lock(
    target_service: str,
) -> IdempotencyKey

Service-level Chaos experiment lock.

Prevents two or more experiments from running concurrently on the same service. (ensures clarity of root-cause analysis)

A dual-lock pattern used together with the Schedule lock: - Service Lock: concurrency control (released when the experiment ends) - Schedule Lock: re-execution prevention (kept until TTL)

Parameters:

Name Type Description Default
target_service str

Target service name

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for service-level chaos lock

Usage

Example of the dual-lock pattern

service_lock = IdempotencyKey.for_chaos_service_lock(target_service) schedule_lock = IdempotencyKey.for_chaos_experiment(schedule_id, exp_type, target_service)

1. Acquire the service lock (concurrency control)

if not idempotency.acquire_lock(service_lock, ttl_seconds=7200): return "another experiment running"

2. Acquire the schedule lock (re-execution prevention)

if not idempotency.acquire_lock(schedule_lock, ttl_seconds=86400): idempotency.release_lock(service_lock) # rollback return "schedule already executed"

try: # 3. Run the experiment execute_experiment() finally: # 4. Release only the service lock (the schedule lock keeps its TTL) idempotency.release_lock(service_lock)

for_config_change classmethod

for_config_change(
    config_key: str,
    new_value_hash: str,
    changed_by: str,
    request_id: str | None = None,
    window_id: str | None = None,
) -> IdempotencyKey

Create an idempotency key for a configuration change.

Prevents the same configuration change from being applied twice.

Parameters:

Name Type Description Default
config_key str

Configuration key

required
new_value_hash str

Hash of the new value

required
changed_by str

Who made the change

required
request_id str | None

Request ID (only retries of the same request count as duplicates)

None
window_id str | None

Sliding-window ID (time-window based, recommended)

None

Idempotency-scope policy: - request_id provided: only retries of the same request are duplicates - window_id provided: only the same change within the same window is a duplicate - neither: based on new_value_hash (legacy behavior)

Returns:

Type Description
IdempotencyKey

IdempotencyKey for config change

Reference: Architect Review - "distinguish intentional reconfiguration vs duplicates"

for_l2_sync classmethod

for_l2_sync(
    service_name: str, record_id: str, intended_state: str
) -> IdempotencyKey

Create an idempotency key for L2 synchronization.

Prevents the same record from being synchronized twice after recovery.

Parameters:

Name Type Description Default
service_name str

Service name

required
record_id str

Record ID

required
intended_state str

Target state

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for L2 sync

for_wal_recovery classmethod

for_wal_recovery(
    wal_entry_id: str, operation: str
) -> IdempotencyKey

Create an idempotency key for WAL recovery.

Prevents the same WAL entry from being processed twice.

Parameters:

Name Type Description Default
wal_entry_id str

WAL entry ID

required
operation str

Recovery-operation type

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for WAL recovery

for_auto_adjustment classmethod

for_auto_adjustment(
    module: str, parameter: str, target_value: str
) -> IdempotencyKey

Create an idempotency key for an autonomous adjustment.

Prevents the same adjustment from being applied twice.

Parameters:

Name Type Description Default
module str

Module name (circuit_breaker, retry, etc.)

required
parameter str

Parameter name

required
target_value str

Target value

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for auto adjustment

Note

Use AntiFlappingWindow separately for flapping checks. get_anti_flapping_window().check_and_record(...)

for_recovery_action classmethod

for_recovery_action(
    action_type: str,
    target: str,
    region_id: str,
    session_id: str,
) -> IdempotencyKey

Create an idempotency key for a recovery action.

Prevents duplicate execution of the same recovery action across regions in a Multi-Region Active-Active environment.

Parameters:

Name Type Description Default
action_type str

Action type ("cb_reset", "pod_restart", "dlq_retry", etc.)

required
target str

Target (service name, Pod name, etc.)

required
region_id str

Executing region (e.g., "ap-northeast-2")

required
session_id str

Recovery session ID (prevents duplicates within the same recovery session)

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for recovery action

Example

Check idempotency before a CB reset

key = IdempotencyKey.for_recovery_action( action_type="cb_reset", target="payment_api", region_id="ap-northeast-2", session_id="sess-12345", )

result = idempotency_service.check(key) if result.is_duplicate: logger.info("idempotency.already_executed_another_region") return

Execute

circuit_breaker.reset("payment_api")

for_cb_reset classmethod

for_cb_reset(
    service_name: str, region_id: str, trigger_id: str
) -> IdempotencyKey

Idempotency key dedicated to a Circuit Breaker reset.

A convenience method for for_recovery_action, providing a CB-reset-specific interface.

Parameters:

Name Type Description Default
service_name str

Service name (e.g., "payment_api")

required
region_id str

Executing region (e.g., "ap-northeast-2")

required
trigger_id str

Trigger ID (e.g., recovery session ID)

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for CB reset action

Example

key = IdempotencyKey.for_cb_reset( service_name="payment_api", region_id="ap-northeast-2", trigger_id="recovery-sess-abc123", )

if not idempotency_service.check(key).is_duplicate: circuit_breaker.reset("payment_api")

for_pod_restart classmethod

for_pod_restart(
    pod_name: str,
    namespace: str,
    region_id: str,
    session_id: str,
) -> IdempotencyKey

Idempotency key dedicated to a Pod restart.

Parameters:

Name Type Description Default
pod_name str

Pod name

required
namespace str

Kubernetes namespace

required
region_id str

Executing region

required
session_id str

Recovery session ID

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for pod restart action

for_dlq_retry classmethod

for_dlq_retry(
    queue_name: str,
    message_id: str,
    region_id: str,
    session_id: str,
) -> IdempotencyKey

Idempotency key dedicated to a DLQ retry.

Parameters:

Name Type Description Default
queue_name str

DLQ queue name

required
message_id str

Message ID

required
region_id str

Executing region

required
session_id str

Recovery session ID

required

Returns:

Type Description
IdempotencyKey

IdempotencyKey for DLQ retry action

for_dlq_replay classmethod

for_dlq_replay(
    dlq_id: int, domain: str, retry_count: int
) -> IdempotencyKey

DLQ replay dedup — prevents concurrent replay of same entry at same attempt.

Uses DLQ entry PK (guaranteed unique per DB) + retry_count to scope idempotency per attempt. Different attempts (retry_count) get different keys, allowing intentional retries while blocking concurrent duplicates.

IdempotencyDomain

Bases: str, Enum

Domains that support idempotency checking (domain-neutral).

EXTERNAL_SERVICE class-attribute instance-attribute

EXTERNAL_SERVICE = 'external_service'

External service calls (payments, notifications, etc.).

INTERNAL_PROCESS class-attribute instance-attribute

INTERNAL_PROCESS = 'internal_process'

Internal processes (inventory decrement, point accrual, etc.).

ASYNC_TASK class-attribute instance-attribute

ASYNC_TASK = 'async_task'

Asynchronous tasks (Celery Task, etc.).

EVENT class-attribute instance-attribute

EVENT = 'event'

Event handling (Webhook, message, etc.).

CUSTOM class-attribute instance-attribute

CUSTOM = 'custom'

Custom domain.

CHAOS_EXPERIMENT class-attribute instance-attribute

CHAOS_EXPERIMENT = 'chaos_experiment'

Chaos experiment execution (prevents duplicate execution of the same experiment).

CHAOS_ZOMBIE_HUNTER class-attribute instance-attribute

CHAOS_ZOMBIE_HUNTER = 'chaos_zombie_hunter'

Zombie Hunter distributed lock (prevents duplicate rollback of orphan experiments).

Detects orphaned experiments and cleans them up safely.

CONFIG_CHANGE class-attribute instance-attribute

CONFIG_CHANGE = 'config_change'

Configuration change (prevents duplicate application of the same change).

L2_SYNC class-attribute instance-attribute

L2_SYNC = 'l2_sync'

L2 storage synchronization (prevents duplicate re-sync after recovery).

WAL_RECOVERY class-attribute instance-attribute

WAL_RECOVERY = 'wal_recovery'

WAL recovery (prevents duplicate processing of the same entry).

AUTO_ADJUSTMENT class-attribute instance-attribute

AUTO_ADJUSTMENT = 'auto_adjustment'

Autonomous adjustment (prevents duplicate application of the same adjustment).

RECOVERY_ACTION class-attribute instance-attribute

RECOVERY_ACTION = 'recovery_action'

Recovery action (CB reset, Pod restart, DLQ retry, etc.) duplicate-execution prevention.

get_idempotency_service

get_idempotency_service() -> IdempotencyService

Get the singleton IdempotencyService instance.