baldur_pro.services.bulkhead — Bulkhead
Concurrency isolation primitives: BulkheadPolicy, the semaphore and
thread-pool bulkheads, and the @bulkhead decorator.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
bulkhead
Bulkhead Pattern - prevent cascading failures through resource isolation.
The Bulkhead pattern, originating from ship design, isolates resources so that a failure in one component does not propagate to others.
Key components: - SemaphoreBulkhead: semaphore-based concurrency limit (I/O bound) - AsyncSemaphoreBulkhead: async semaphore-based bulkhead - ThreadPoolBulkhead: thread-pool-based isolation (CPU bound) - BulkheadRegistry: per-domain bulkhead management - @bulkhead: sync/async auto-dispatching decorator
Usage
from baldur_pro.services.bulkhead import ( SemaphoreBulkhead, bulkhead, get_bulkhead_registry, ) from baldur.core.connection_health import ConnectionType
Direct use
bulkhead = SemaphoreBulkhead("my_domain", max_concurrent=10) with bulkhead.acquire(timeout=5.0): do_work()
Registry use
registry = get_bulkhead_registry() db_bulkhead = registry.get(ConnectionType.DATABASE) with db_bulkhead.acquire(): db_operation()
Decorator use
@bulkhead(ConnectionType.DATABASE) def db_operation(): pass
@bulkhead(ConnectionType.DATABASE) async def async_db_operation(): pass
Status: Public
AsyncSemaphoreBulkhead
AsyncSemaphoreBulkhead(name: str, max_concurrent: int = 10)
Async semaphore-based bulkhead.
Limits concurrency in an asyncio environment to prevent resource exhaustion. Does not block the event loop; suited to async I/O work.
Features: - asyncio.Semaphore-based non-blocking wait - asyncio.wait_for-based timeout - Rejection statistics tracking
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Bulkhead name (domain identifier) |
required |
max_concurrent
|
int
|
Maximum concurrent executions |
10
|
name
property
name: str
Bulkhead name.
acquire
async
acquire(
timeout: float | None = None,
) -> AsyncGenerator[None, None]
Acquire a resource asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Wait timeout (seconds). None fails immediately (non-blocking). |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[None, None]
|
None |
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When resource acquisition fails |
try_acquire
async
try_acquire(timeout: float | None = None) -> bool
Acquisition attempt, mirroring this class's acquire timeout contract.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Maximum time (seconds) to wait for capacity — an upper bound on waiting, not a guarantee of it. None means no waiting (immediate verdict). |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True on success, False on failure |
release
async
release() -> None
Release the resource.
get_state
get_state() -> BulkheadState
Return the current state (synchronous method).
Bulkhead
Bases: ABC
Bulkhead abstract interface.
Prevents a failure in one component from propagating to other components through resource isolation.
Usage (context manager): bulkhead = SemaphoreBulkhead("database", max_concurrent=10)
with bulkhead.acquire(timeout=5.0):
do_database_work()
Usage (decorator): @bulkhead.wrap def do_work(): pass
name
abstractmethod
property
name: str
Bulkhead name (domain identifier).
acquire
abstractmethod
acquire(
timeout: float | None = None,
) -> Generator[None, None, None]
Acquire the resource (context manager).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Wait timeout (seconds). If None, fail immediately (non-blocking). |
None
|
Yields:
| Type | Description |
|---|---|
None
|
None |
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When resource acquisition fails |
try_acquire
abstractmethod
try_acquire(timeout: float | None = None) -> bool
Attempt to acquire the resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Maximum time (seconds) the implementation may wait for capacity — an upper bound on waiting, not a guarantee of it. None means no waiting (immediate verdict). Implementations whose capacity model already buffers bursts (bounded-queue thread pools) may return an immediate verdict for any timeout value. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if acquisition succeeded, False if it failed |
release
abstractmethod
release() -> None
Release the resource.
get_state
abstractmethod
get_state() -> BulkheadState
Return the current state.
wrap
wrap(fn: Callable[..., T]) -> Callable[..., T]
Decorator that wraps a function with the bulkhead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., T]
|
Function to wrap |
required |
Returns:
| Type | Description |
|---|---|
Callable[..., T]
|
Function with the bulkhead applied |
BulkheadState
dataclass
BulkheadState(
name: str,
bulkhead_type: BulkheadType,
max_concurrent: int,
active_count: int,
waiting_count: int,
rejected_count: int,
last_rejection_time: datetime | None = None,
)
Bulkhead current state data.
name
instance-attribute
name: str
Bulkhead name (domain identifier)
bulkhead_type
instance-attribute
bulkhead_type: BulkheadType
Bulkhead type
max_concurrent
instance-attribute
max_concurrent: int
Maximum allowed concurrent execution count
active_count
instance-attribute
active_count: int
Number of currently running tasks
waiting_count
instance-attribute
waiting_count: int
Number of waiting tasks
rejected_count
instance-attribute
rejected_count: int
Total number of rejected requests
last_rejection_time
class-attribute
instance-attribute
last_rejection_time: datetime | None = None
Last rejection time
available_permits
property
available_permits: int
Number of available permits.
utilization_percent
property
utilization_percent: float
Utilization (0-100%).
BulkheadType
Bases: str, Enum
Bulkhead type.
SEMAPHORE
class-attribute
instance-attribute
SEMAPHORE = 'semaphore'
Semaphore-based - limits only the concurrent execution count
THREAD_POOL
class-attribute
instance-attribute
THREAD_POOL = 'thread_pool'
Thread-pool-based - isolated execution in a dedicated thread pool
BulkheadError
BulkheadError(message: str = '', *, code: str = '')
Bases: ResilienceError
Base exception for bulkhead-related failures.
extra_context
extra_context() -> dict
Return structlog-bindable context for bulkhead errors.
BulkheadFullError
BulkheadFullError(
bulkhead_name: str,
max_concurrent: int,
active_count: int,
)
Bases: PolicyRejectedException, BulkheadError
Raised when the bulkhead has no permits available and a new call is rejected.
Multi-inherits PolicyRejectedException so the outer
PolicyComposer catch hierarchy classifies bulkhead rejection as
PolicyOutcome.REJECTED rather than the generic except Exception
branch (which would mislabel them as FAILURE).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
bulkhead identifier |
required |
max_concurrent
|
int
|
maximum allowed concurrent executions |
required |
active_count
|
int
|
number of executions currently active |
required |
extra_context
extra_context() -> dict
Return structlog-bindable context for bulkhead full errors.
BulkheadNotFoundError
BulkheadNotFoundError(
bulkhead_name: str, registered_names: list[str]
)
Bases: BulkheadError, KeyError
Raised when a bulkhead is requested for a domain that has not been provisioned.
Custom domains must be provisioned (via register(), get_or_create(),
or a policy/settings helper) before a @bulkhead-decorated function is
first called, on both sync and async callees.
Multi-inherits KeyError so the registry's long-standing not-found contract
stays intact for existing except KeyError consumers (the traffic-gate
skip-degradation path, the admin API 404 mapping), while except BulkheadError
/ except BaldurError also classify it. Multi-inheritance precedent:
BulkheadFullError(PolicyRejectedException, BulkheadError).
__str__ is overridden because KeyError.__str__ would otherwise
repr-quote the message (the base hierarchy defines no __str__), mangling
the actionable text into "Bulkhead not found: ..." with surrounding quotes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
the requested, unregistered domain |
required |
registered_names
|
list[str]
|
currently registered compartment names |
required |
extra_context
extra_context() -> dict
Return structlog-bindable context for bulkhead not-found errors.
BulkheadTimeoutError
BulkheadTimeoutError(bulkhead_name: str, timeout: float)
Bases: TimeoutPolicyError, BulkheadError
Raised when a bulkhead-managed call exceeds its timeout budget.
Emitted by ThreadPoolBulkhead when the wrapped task does not complete
within the configured timeout. Multi-inherits TimeoutPolicyError so
the outer PolicyComposer catch hierarchy classifies the failure as
PolicyOutcome.TIMEOUT (instead of funneling into the generic
except Exception branch as FAILURE).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
bulkhead identifier |
required |
timeout
|
float
|
configured timeout in seconds |
required |
extra_context
extra_context() -> dict
Return structlog-bindable context for bulkhead timeout errors.
BulkheadMetricsUpdater
BulkheadMetricsUpdater(interval: float = 10.0)
Background thread that periodically refreshes every bulkhead's metrics.
Pulls the current state of every registered bulkhead and writes it into the Prometheus metrics on a schedule.
Usage
updater = BulkheadMetricsUpdater(interval=10.0) updater.start()
... application runs ...
updater.stop()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
float
|
metric update interval (seconds) |
10.0
|
start
start() -> None
Start the metrics updater.
stop
stop() -> None
Stop the metrics updater.
AsyncBulkheadPolicy
AsyncBulkheadPolicy(
async_bulkhead: AsyncSemaphoreBulkhead,
timeout: float | None = None,
)
Asynchronous Bulkhead Policy — AsyncResiliencePolicy Protocol implementation.
Wraps AsyncSemaphoreBulkhead and returns results in PolicyResult form. Follows the same exception handling contract as the synchronous BulkheadPolicy.
Since AsyncSemaphoreBulkhead is a separate class that does not inherit from Bulkhead(ABC), it is implemented as a class fully separated from the synchronous BulkheadPolicy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_bulkhead
|
AsyncSemaphoreBulkhead
|
AsyncSemaphoreBulkhead instance (DI). |
required |
timeout
|
float | None
|
Resource acquisition timeout (seconds). If None, fail immediately. |
None
|
name
property
name: str
Policy name.
bulkhead_name
property
bulkhead_name: str
Domain identifier of the internal AsyncSemaphoreBulkhead.
execute
async
execute(
func: Callable[..., T],
*args: Any,
context: PolicyContext | None = None,
**kwargs: Any
) -> PolicyResult[T]
Execute the function within the asynchronous bulkhead resource.
BulkheadFullError → absorbed into PolicyResult(outcome=REJECTED). Business exceptions during function execution are re-propagated via raise.
BulkheadPolicy
BulkheadPolicy(
bulkhead: Bulkhead, timeout: float | None = None
)
Bases: ResiliencePolicy[T]
Synchronous Bulkhead Policy — resource isolation.
Internally reuses the existing Bulkhead ABC implementations. SemaphoreBulkhead runs via the acquire() context manager, and ThreadPoolBulkhead runs via the execute() method.
Exception handling contract (same as CircuitBreakerPolicy): - BulkheadFullError → absorbed into PolicyResult(outcome=REJECTED) - BulkheadTimeoutError → absorbed into PolicyResult(outcome=TIMEOUT) (ThreadPool only) - Business exception during function execution → re-propagated via raise
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead
|
Bulkhead
|
Bulkhead ABC implementation (SemaphoreBulkhead, ThreadPoolBulkhead). Injected via DI. Registry lookup uses the bulkhead_policy() factory. |
required |
timeout
|
float | None
|
Resource acquisition timeout (seconds). If None, fail immediately (Fast Fail). For ThreadPoolBulkhead, used as the execute() execution timeout. |
None
|
name
property
name: str
Policy name.
bulkhead_name
property
bulkhead_name: str
Domain identifier of the internal Bulkhead instance.
execute
execute(
func: Callable[..., T],
*args: Any,
context: PolicyContext | None = None,
**kwargs: Any
) -> PolicyResult[T]
Execute the function within the bulkhead resource.
For ThreadPoolBulkhead: - isinstance check → call ThreadPoolBulkhead.execute() - leverages dedicated thread-pool isolation + ContextVar propagation - BulkheadTimeoutError → mapped to PolicyOutcome.TIMEOUT
For SemaphoreBulkhead: - with acquire(timeout) context manager - BulkheadFullError → mapped to PolicyOutcome.REJECTED
Business exceptions during function execution are re-propagated via raise.
BulkheadRegistry
BulkheadRegistry(settings: BulkheadSettings | None = None)
Per-domain bulkhead registry.
Integrates with the existing ConnectionType and also supports custom domains. Reflects runtime configuration changes by subscribing to the CONFIG_UPDATED event.
Features: - Automatic registration of default bulkheads per ConnectionType - Custom domain support - Automatic creation of asynchronous bulkheads - Fine-grained bulkheads per DB alias / cache instance
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
BulkheadSettings | None
|
Bulkhead settings. Uses defaults if None. |
None
|
close
close() -> None
Unsubscribe EventBus handlers.
Idempotent: safe to call multiple times.
get
get(name: str | ConnectionType) -> Bulkhead
Look up a bulkhead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | ConnectionType
|
Domain name or ConnectionType |
required |
Returns:
| Type | Description |
|---|---|
Bulkhead
|
Bulkhead instance |
Raises:
| Type | Description |
|---|---|
BulkheadNotFoundError
|
Unregistered domain. Subclasses |
get_or_create
get_or_create(
name: str,
max_concurrent: int | None = None,
bulkhead_type: str = "semaphore",
) -> Bulkhead
Look up or create a bulkhead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Domain name |
required |
max_concurrent
|
int | None
|
Maximum concurrent execution count (default if None) |
None
|
bulkhead_type
|
str
|
"semaphore" or "thread_pool" |
'semaphore'
|
Returns:
| Type | Description |
|---|---|
Bulkhead
|
Bulkhead instance |
get_async
get_async(
name: str | ConnectionType,
) -> AsyncSemaphoreBulkhead
Look up an asynchronous bulkhead.
Creates/returns the asynchronous version deriving its capacity from the
synchronous twin. Strict: a domain with no synchronous twin is treated
identically to get() — provisioning must precede the async lookup, so
the async twin can never be a registry-invisible, default-capacity mint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | ConnectionType
|
Domain name or ConnectionType |
required |
Returns:
| Type | Description |
|---|---|
AsyncSemaphoreBulkhead
|
AsyncSemaphoreBulkhead instance |
Raises:
| Type | Description |
|---|---|
BulkheadNotFoundError
|
No synchronous twin for |
get_for_database
get_for_database(alias: str = 'default') -> Bulkhead
Return the bulkhead for a DB alias.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
alias
|
str
|
Django DB alias (default, replica, analytics, etc.) |
'default'
|
Returns:
| Type | Description |
|---|---|
Bulkhead
|
The bulkhead for the given alias |
get_for_cache
get_for_cache(name: str = 'default') -> Bulkhead
Return the bulkhead for a cache instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Cache name (default, session, etc.) |
'default'
|
Returns:
| Type | Description |
|---|---|
Bulkhead
|
The bulkhead for the given cache |
register
register(bulkhead: Bulkhead) -> None
Register a custom bulkhead.
Overwriting a built-in name (one of the four ConnectionType values) is allowed — it is the only path for swapping a built-in's implementation type and is load-bearing for the re-register capacity flow — but the override is transient (the next config reload clobbers it back to the settings-derived built-in), so a WARNING is emitted to flag the footgun.
The async twin for the same name is invalidated so async callees pick up the new capacity on next access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead
|
Bulkhead
|
Bulkhead to register |
required |
unregister
unregister(name: str) -> bool
Unregister a bulkhead.
The async twin for the same name is invalidated alongside the sync entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Domain name |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if unregistration succeeded, False if the name was not registered |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
get_all_states
get_all_states() -> dict[str, BulkheadState]
Return all bulkhead states.
list_names
list_names() -> list[str]
Return all registered domain names.
SemaphoreBulkhead
SemaphoreBulkhead(
name: str, max_concurrent: int = 10, fair: bool = True
)
Bases: Bulkhead
Semaphore-based bulkhead.
Limits the concurrent execution count to prevent resource exhaustion. Suitable for I/O-bound work (DB queries, cache lookups, etc.).
Features: - Maximum concurrent execution count limit - Timeout-based wait support - Rejection statistics tracking
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Bulkhead name (domain identifier) |
required |
max_concurrent
|
int
|
Maximum concurrent execution count |
10
|
fair
|
bool
|
If True, guarantees FIFO ordering (future implementation) |
True
|
name
property
name: str
Bulkhead name.
acquire
acquire(
timeout: float | None = None,
) -> Generator[None, None, None]
Acquire the resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Wait timeout (seconds). If None, fail immediately (non-blocking). |
None
|
Yields:
| Type | Description |
|---|---|
None
|
None |
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When resource acquisition fails |
try_acquire
try_acquire(timeout: float | None = None) -> bool
Attempt to acquire. Non-blocking if timeout is None, otherwise waits up to the given time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Wait timeout (seconds). If None, succeed/fail immediately. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if acquisition succeeded, False if it failed |
release
release() -> None
Release the resource.
get_state
get_state() -> BulkheadState
Return the current state.
ThreadPoolBulkhead
ThreadPoolBulkhead(
name: str,
max_workers: int = 5,
queue_size: int = 10,
thread_name_prefix: str | None = None,
)
Bases: Bulkhead
Thread-pool-based bulkhead.
Provides complete isolation by running tasks in a dedicated thread pool. Request ID, tracing context, etc. are propagated to the worker thread via contextvars.copy_context().
Features: - Complete isolation with a dedicated thread pool - Automatic ContextVar propagation (test mode, request overrides, etc.) - Wait queue size limit - Timeout support
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Bulkhead name (domain identifier) |
required |
max_workers
|
int
|
Maximum number of worker threads |
5
|
queue_size
|
int
|
Wait queue size (requests are rejected when exceeded) |
10
|
thread_name_prefix
|
str | None
|
Thread name prefix |
None
|
name
property
name: str
Bulkhead name.
acquire
acquire(
timeout: float | None = None,
) -> Generator[None, None, None]
For Thread Pool Bulkhead, using submit/execute is recommended. This method is provided for Bulkhead interface compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Unused (interface compatibility) |
None
|
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When the queue is full |
try_acquire
try_acquire(timeout: float | None = None) -> bool
Acquisition attempt with an immediate verdict.
The bounded wait queue (max_workers + queue_size) is already the
burst buffer, so the verdict is immediate for any timeout value —
timeout is accepted for contract parity but not waited on. Since
timeout is an upper bound on waiting, zero waiting satisfies it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Upper bound on waiting (seconds). Accepted for interface parity; this implementation returns an immediate verdict regardless of its value. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if acquisition succeeded, False if the compartment is full |
release
release() -> None
Release the resource.
submit
submit(
fn: Callable[..., T], *args: Any, **kwargs: Any
) -> Future[T]
Submit a task asynchronously (with ContextVar propagation).
Uses contextvars.copy_context() to propagate the current thread's ContextVars to the worker thread. This keeps the test-mode flag, request overrides, etc. preserved within the thread pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., T]
|
Function to execute |
required |
*args
|
Any
|
Positional arguments |
()
|
**kwargs
|
Any
|
Keyword arguments |
{}
|
Returns:
| Type | Description |
|---|---|
Future[T]
|
Future object |
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When the wait queue is full |
execute
execute(
fn: Callable[..., T],
*args: Any,
timeout: float = 30.0,
**kwargs: Any
) -> T
Execute a task synchronously.
Calls submit() and waits for the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., T]
|
Function to execute |
required |
*args
|
Any
|
Positional arguments |
()
|
timeout
|
float
|
Execution timeout (seconds) |
30.0
|
**kwargs
|
Any
|
Keyword arguments |
{}
|
Returns:
| Type | Description |
|---|---|
T
|
Function execution result |
Raises:
| Type | Description |
|---|---|
BulkheadFullError
|
When the wait queue is full |
BulkheadTimeoutError
|
When a timeout occurs |
get_state
get_state() -> BulkheadState
Return the current state.
shutdown
shutdown(wait: bool = True) -> None
Shut down the thread pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
If True, shut down after waiting for pending tasks to complete |
True
|
bulkhead
bulkhead(
name: str | ConnectionType,
timeout: float | None = None,
fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]
Bulkhead decorator (sync/async auto-dispatching).
Detects the function type via asyncio.iscoroutinefunction() and automatically applies the appropriate bulkhead (sync/async).
Custom domains must be provisioned (via get_or_create(), register(),
or a policy/settings helper) before the decorated function is first called.
An unregistered domain raises BulkheadNotFoundError — naming the registered
compartments — on both sync and async callees; the error is raised before any
fallback is considered, so fallback does not mask it. The four built-in
compartments (database, cache, external_api, message_queue) are
always available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | ConnectionType
|
Domain name or ConnectionType |
required |
timeout
|
float | None
|
Resource acquisition wait timeout (seconds). If None, fail immediately. |
None
|
fallback
|
Callable[..., T] | None
|
Alternative function to call when the bulkhead is full |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., T]], Callable[..., T]]
|
Decorator with the bulkhead applied |
Examples:
Synchronous function (built-in compartment)
@bulkhead(ConnectionType.DATABASE) def db_query(): return execute_query()
Asynchronous function (built-in compartment)
@bulkhead(ConnectionType.DATABASE) async def async_db_query(): return await execute_async_query()
Timeout setting (built-in compartment)
@bulkhead("external_api", timeout=5.0) def call_external_api(): return requests.get(url)
Custom domain with fallback — provision it first, then decorate
from baldur_pro.services.bulkhead import get_bulkhead_registry
get_bulkhead_registry().get_or_create("reports", max_concurrent=5)
@bulkhead("reports", fallback=lambda: {"status": "unavailable"}) def get_data(): return fetch_data()
bulkhead_for_cache
bulkhead_for_cache(
name: str = "default",
timeout: float | None = None,
fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]
Per-cache-instance bulkhead decorator.
Internally uses BulkheadPolicy/AsyncBulkheadPolicy and looks up the bulkhead for the given cache via the Registry's get_for_cache().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Cache name (default, session, etc.) |
'default'
|
timeout
|
float | None
|
Resource acquisition wait timeout (seconds) |
None
|
fallback
|
Callable[..., T] | None
|
(deprecated) Alternative function to call when the bulkhead is full. Using the BulkheadPolicy + FallbackPolicy combination directly is recommended. |
None
|
Examples:
@bulkhead_for_cache("default") def get_cached_value(key: str): return cache.get(key)
@bulkhead_for_cache("session") def get_session_data(session_id: str): return session_cache.get(session_id)
bulkhead_for_database
bulkhead_for_database(
alias: str = "default",
timeout: float | None = None,
fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]
Per-DB-alias bulkhead decorator.
Internally uses BulkheadPolicy/AsyncBulkheadPolicy and looks up the bulkhead for the given alias via the Registry's get_for_database().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
alias
|
str
|
Django DB alias (default, replica, analytics, etc.) |
'default'
|
timeout
|
float | None
|
Resource acquisition wait timeout (seconds) |
None
|
fallback
|
Callable[..., T] | None
|
(deprecated) Alternative function to call when the bulkhead is full. Using the BulkheadPolicy + FallbackPolicy combination directly is recommended. |
None
|
Examples:
@bulkhead_for_database("default") def write_to_db(): Model.objects.using("default").create(...)
@bulkhead_for_database("replica") def read_from_replica(): return Model.objects.using("replica").all()
get_metrics_updater
get_metrics_updater(
interval: float = 10.0,
) -> BulkheadMetricsUpdater
Return the BulkheadMetricsUpdater singleton.
increment_rejected_count
increment_rejected_count(bulkhead_name: str) -> None
Increment the rejected counter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
bulkhead name |
required |
start_metrics_updater
start_metrics_updater(
interval: float = 10.0,
) -> BulkheadMetricsUpdater
Start the metrics updater (convenience function).
stop_metrics_updater
stop_metrics_updater() -> None
Stop the metrics updater (convenience function).
update_bulkhead_metrics
update_bulkhead_metrics(
bulkhead_name: str,
bulkhead_type: str,
active_count: int,
max_concurrent: int,
waiting_count: int,
rejected_count: int,
) -> None
Update bulkhead metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
bulkhead name |
required |
bulkhead_type
|
str
|
bulkhead type (semaphore, thread_pool) |
required |
active_count
|
int
|
current number of active requests |
required |
max_concurrent
|
int
|
maximum concurrent capacity |
required |
waiting_count
|
int
|
number of waiting requests |
required |
rejected_count
|
int
|
total rejected requests |
required |
bulkhead_operation_span
bulkhead_operation_span(
bulkhead_name: str, operation: str
) -> Generator[dict[str, Any], None, None]
Create a Span for an operation inside a bulkhead (optional).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
Bulkhead name |
required |
operation
|
str
|
Operation name (e.g., "db_query", "api_call") |
required |
Yields:
| Name | Type | Description |
|---|---|---|
span_data |
dict[str, Any]
|
Dictionary of data to attach to the Span |
Examples:
with bulkhead.acquire(): with bulkhead_operation_span("database", "user_query") as span_data: users = User.objects.all() span_data["count"] = len(users)
bulkhead_span
bulkhead_span(
bulkhead_name: str,
max_concurrent: int,
timeout: float | None = None,
) -> Generator[dict[str, Any], None, None]
Create an OpenTelemetry Span for a bulkhead operation (optional).
When OTel is disabled, returns only an empty context and works normally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bulkhead_name
|
str
|
Bulkhead name |
required |
max_concurrent
|
int
|
Maximum concurrent executions |
required |
timeout
|
float | None
|
Timeout setting |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
span_data |
dict[str, Any]
|
Dictionary of data to attach to the Span |
Examples:
with bulkhead_span("database", 10) as span_data: # Perform work result = do_work() span_data["result_size"] = len(result)
async_bulkhead_policy
async_bulkhead_policy(
name: str,
max_concurrent: int | None = None,
timeout: float | None = None,
) -> AsyncBulkheadPolicy
AsyncBulkheadPolicy factory — BulkheadRegistry singleton integration.
Calls the Registry's get_async() to guarantee a single global AsyncSemaphoreBulkhead instance for the same name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Domain name (Registry key) |
required |
max_concurrent
|
int | None
|
Maximum concurrent execution count (Registry default if None). The synchronous Bulkhead is provisioned first (so the domain is registry-visible to the admin API, metrics, and shutdown iteration) and used as the configuration basis for the asynchronous instance. |
None
|
timeout
|
float | None
|
Resource acquisition timeout (fail immediately if None) |
None
|
Returns:
| Type | Description |
|---|---|
AsyncBulkheadPolicy
|
AsyncBulkheadPolicy instance (uses the Registry singleton AsyncSemaphoreBulkhead) |
bulkhead_policy
bulkhead_policy(
name: str,
max_concurrent: int | None = None,
timeout: float | None = None,
bulkhead_type: str = "semaphore",
) -> BulkheadPolicy
BulkheadPolicy factory — BulkheadRegistry singleton integration.
Calls the Registry's get_or_create() to guarantee a single global Bulkhead instance for the same name.
The Registry dependency occurs only in this factory function. The BulkheadPolicy class itself does not know the Registry (test-friendly).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Domain name (Registry key) |
required |
max_concurrent
|
int | None
|
Maximum concurrent execution count (Registry default if None) |
None
|
timeout
|
float | None
|
Resource acquisition timeout (fail immediately if None) |
None
|
bulkhead_type
|
str
|
"semaphore" or "thread_pool" |
'semaphore'
|
Returns:
| Type | Description |
|---|---|
BulkheadPolicy
|
BulkheadPolicy instance (uses the Registry singleton Bulkhead) |
get_bulkhead_registry
get_bulkhead_registry() -> BulkheadRegistry
Return the BulkheadRegistry singleton.
reset_bulkhead_registry
reset_bulkhead_registry() -> None
Reset the singleton (for testing). Calls close() before clearing.