Skip to content

baldur_pro.services.replay — Replay Queue

Backpressure-aware replay of stored failures: ReplayQueueService with its BackpressureStatus and RateLimitStatus signals.

🔒 PRO Feature — requires a baldur-pro license

These symbols ship in the baldur-pro distribution. PRO modules import normally — there is no ImportError. PRO features activate only when baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system runs with OSS defaults and register_pro_services() logs entitlement.pro_registration_skipped.

replay

Replay Queue Package.

Provides replay queue infrastructure with rate limiting and backpressure support for the baldur framework.

Usage

from baldur_pro.services.replay import ( get_replay_queue_service, reset_replay_queue_service, )

service = get_replay_queue_service() success = service.enqueue_raw(payload, source="my_component")

Status: Public

BackpressureStatus dataclass

BackpressureStatus(
    active: bool = False,
    queue_depth: int = 0,
    max_depth: int = 10000,
    consumer_lag: float = 0.0,
    throttle_percent: float = 0.0,
)

Status of backpressure mechanisms for the replay queue.

Attributes:

Name Type Description
active bool

Whether backpressure is currently being applied.

queue_depth int

Current number of items in the queue.

max_depth int

Maximum queue depth before backpressure activates.

consumer_lag float

Estimated consumer lag in seconds.

throttle_percent float

Current throttle percentage (0-100).

RateLimitStatus dataclass

RateLimitStatus(
    enabled: bool = True,
    current_rate: int = 0,
    limit: int = 1000,
    rejected_count: int = 0,
    window_seconds: int = 60,
)

Status of the rate limiter for the replay queue.

Attributes:

Name Type Description
enabled bool

Whether rate limiting is currently active.

current_rate int

Number of requests accepted in the current window.

limit int

Maximum allowed requests per window.

rejected_count int

Total number of requests rejected due to rate limiting.

window_seconds int

Duration of the sliding window in seconds.

ReplayQueueService

ReplayQueueService(
    max_depth: int = DEFAULT_MAX_DEPTH,
    rate_limit: int = DEFAULT_RATE_LIMIT,
    rate_window_seconds: int = DEFAULT_RATE_WINDOW_SECONDS,
    backpressure_threshold: float = DEFAULT_BACKPRESSURE_THRESHOLD,
    rate_limiting_enabled: bool = True,
)

In-memory replay queue with rate limiting and backpressure.

This service provides a standalone replay queue that works without external dependencies (Redis, Kafka, etc.). It is designed for environments where the full replay infrastructure may not be available, such as chaos experiments and testing scenarios.

Thread-safety

All public methods are protected by a threading.Lock, making this class safe for concurrent use from multiple threads.

Fail-safe defaults
  • Rate limiting enabled with generous defaults (1000 req/60s)
  • Max queue depth of 10,000 to prevent unbounded memory growth
  • Backpressure activates at 80% queue capacity
  • All operations degrade gracefully on internal errors

Initialize the replay queue service.

Parameters:

Name Type Description Default
max_depth int

Maximum queue depth before rejecting new items.

DEFAULT_MAX_DEPTH
rate_limit int

Maximum number of enqueues per window.

DEFAULT_RATE_LIMIT
rate_window_seconds int

Sliding window duration in seconds.

DEFAULT_RATE_WINDOW_SECONDS
backpressure_threshold float

Queue fullness ratio (0.0-1.0) at which backpressure activates.

DEFAULT_BACKPRESSURE_THRESHOLD
rate_limiting_enabled bool

Whether to enforce rate limiting.

True

enqueue_raw

enqueue_raw(
    payload: bytes, source: str = "unknown"
) -> bool

Enqueue a raw payload for replay processing.

Parameters:

Name Type Description Default
payload bytes

Raw bytes payload to enqueue.

required
source str

Identifier of the component that produced the payload.

'unknown'

Returns:

Type Description
bool

True if the payload was accepted, False if rejected due to

bool

queue depth limit or rate limiting.

dequeue

dequeue() -> _QueueEntry | None

Dequeue the next entry for processing.

Returns:

Type Description
_QueueEntry | None

The next queue entry, or None if the queue is empty.

get_rate_limit_status

get_rate_limit_status() -> RateLimitStatus

Get current rate limiting status.

Returns:

Type Description
RateLimitStatus

RateLimitStatus with current rate limiting metrics.

get_backpressure_status

get_backpressure_status() -> BackpressureStatus

Get current backpressure status.

Returns:

Type Description
BackpressureStatus

BackpressureStatus with current queue depth and throttle metrics.

get_queue_depth

get_queue_depth() -> int

Get the current number of items in the queue.

Returns:

Type Description
int

Current queue depth.

get_stats

get_stats() -> dict[str, Any]

Get overall queue statistics.

Returns:

Type Description
dict[str, Any]

Dict with enqueue/dequeue counts, depth, and status info.

get_replay_queue_service

get_replay_queue_service() -> ReplayQueueService

Get the singleton ReplayQueueService instance.

Thread-safe lazy initialization. Creates the service with default configuration on first access.

Returns:

Type Description
ReplayQueueService

The singleton ReplayQueueService instance.

reset_replay_queue_service

reset_replay_queue_service() -> None

Reset the singleton instance (for testing).

This clears the singleton so the next call to get_replay_queue_service() creates a fresh instance.