baldur_pro.services.replay — Replay Queue
Backpressure-aware replay of stored failures: ReplayQueueService with its
BackpressureStatus and RateLimitStatus signals.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
replay
Replay Queue Package.
Provides replay queue infrastructure with rate limiting and backpressure support for the baldur framework.
Usage
from baldur_pro.services.replay import ( get_replay_queue_service, reset_replay_queue_service, )
service = get_replay_queue_service() success = service.enqueue_raw(payload, source="my_component")
Status: Public
BackpressureStatus
dataclass
BackpressureStatus(
active: bool = False,
queue_depth: int = 0,
max_depth: int = 10000,
consumer_lag: float = 0.0,
throttle_percent: float = 0.0,
)
Status of backpressure mechanisms for the replay queue.
Attributes:
| Name | Type | Description |
|---|---|---|
active |
bool
|
Whether backpressure is currently being applied. |
queue_depth |
int
|
Current number of items in the queue. |
max_depth |
int
|
Maximum queue depth before backpressure activates. |
consumer_lag |
float
|
Estimated consumer lag in seconds. |
throttle_percent |
float
|
Current throttle percentage (0-100). |
RateLimitStatus
dataclass
RateLimitStatus(
enabled: bool = True,
current_rate: int = 0,
limit: int = 1000,
rejected_count: int = 0,
window_seconds: int = 60,
)
Status of the rate limiter for the replay queue.
Attributes:
| Name | Type | Description |
|---|---|---|
enabled |
bool
|
Whether rate limiting is currently active. |
current_rate |
int
|
Number of requests accepted in the current window. |
limit |
int
|
Maximum allowed requests per window. |
rejected_count |
int
|
Total number of requests rejected due to rate limiting. |
window_seconds |
int
|
Duration of the sliding window in seconds. |
ReplayQueueService
ReplayQueueService(
max_depth: int = DEFAULT_MAX_DEPTH,
rate_limit: int = DEFAULT_RATE_LIMIT,
rate_window_seconds: int = DEFAULT_RATE_WINDOW_SECONDS,
backpressure_threshold: float = DEFAULT_BACKPRESSURE_THRESHOLD,
rate_limiting_enabled: bool = True,
)
In-memory replay queue with rate limiting and backpressure.
This service provides a standalone replay queue that works without external dependencies (Redis, Kafka, etc.). It is designed for environments where the full replay infrastructure may not be available, such as chaos experiments and testing scenarios.
Thread-safety
All public methods are protected by a threading.Lock, making this class safe for concurrent use from multiple threads.
Fail-safe defaults
- Rate limiting enabled with generous defaults (1000 req/60s)
- Max queue depth of 10,000 to prevent unbounded memory growth
- Backpressure activates at 80% queue capacity
- All operations degrade gracefully on internal errors
Initialize the replay queue service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_depth
|
int
|
Maximum queue depth before rejecting new items. |
DEFAULT_MAX_DEPTH
|
rate_limit
|
int
|
Maximum number of enqueues per window. |
DEFAULT_RATE_LIMIT
|
rate_window_seconds
|
int
|
Sliding window duration in seconds. |
DEFAULT_RATE_WINDOW_SECONDS
|
backpressure_threshold
|
float
|
Queue fullness ratio (0.0-1.0) at which backpressure activates. |
DEFAULT_BACKPRESSURE_THRESHOLD
|
rate_limiting_enabled
|
bool
|
Whether to enforce rate limiting. |
True
|
enqueue_raw
enqueue_raw(
payload: bytes, source: str = "unknown"
) -> bool
Enqueue a raw payload for replay processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
bytes
|
Raw bytes payload to enqueue. |
required |
source
|
str
|
Identifier of the component that produced the payload. |
'unknown'
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the payload was accepted, False if rejected due to |
bool
|
queue depth limit or rate limiting. |
dequeue
dequeue() -> _QueueEntry | None
Dequeue the next entry for processing.
Returns:
| Type | Description |
|---|---|
_QueueEntry | None
|
The next queue entry, or None if the queue is empty. |
get_rate_limit_status
get_rate_limit_status() -> RateLimitStatus
Get current rate limiting status.
Returns:
| Type | Description |
|---|---|
RateLimitStatus
|
RateLimitStatus with current rate limiting metrics. |
get_backpressure_status
get_backpressure_status() -> BackpressureStatus
Get current backpressure status.
Returns:
| Type | Description |
|---|---|
BackpressureStatus
|
BackpressureStatus with current queue depth and throttle metrics. |
get_queue_depth
get_queue_depth() -> int
Get the current number of items in the queue.
Returns:
| Type | Description |
|---|---|
int
|
Current queue depth. |
get_stats
get_stats() -> dict[str, Any]
Get overall queue statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with enqueue/dequeue counts, depth, and status info. |
get_replay_queue_service
get_replay_queue_service() -> ReplayQueueService
Get the singleton ReplayQueueService instance.
Thread-safe lazy initialization. Creates the service with default configuration on first access.
Returns:
| Type | Description |
|---|---|
ReplayQueueService
|
The singleton ReplayQueueService instance. |
reset_replay_queue_service
reset_replay_queue_service() -> None
Reset the singleton instance (for testing).
This clears the singleton so the next call to get_replay_queue_service() creates a fresh instance.