Skip to content

baldur.interfaces — Health, Pools & Quorum

Connection-pool and PostgreSQL-admin providers (stats, leak reports, health status) and the quorum-witness protocol that prevents multi-region split-brain.

PostgreSQL admin

PgAdminProvider

Bases: ABC

Abstract interface for PostgreSQL admin SQL primitives.

Concrete implementations route SQL through a runtime-specific cursor (Django connections[alias].cursor(), DB-API 2.0 conn.cursor(), or noop). Callers should not assume any specific backend.

Availability gate: is_available() returns False for the no-op implementation so consumers can omit PG-specific keys from their response dicts when the underlying runtime cannot satisfy the contract.

is_available abstractmethod

is_available() -> bool

Return True iff this provider can execute PG admin SQL.

ping abstractmethod

ping() -> bool

Verify connectivity with SELECT 1.

get_connection_stats abstractmethod

get_connection_stats() -> ConnectionStats

Read connection state counts from pg_stat_activity.

get_active_connection_count abstractmethod

get_active_connection_count() -> int

Return the count of state = 'active' rows in pg_stat_activity.

pg_sleep abstractmethod

pg_sleep(seconds: float) -> None

Execute SELECT pg_sleep(...).

execute_slow_query abstractmethod

execute_slow_query(seconds: int) -> None

Hold a backend session via pg_sleep for the given duration.

get_backend_pid_with_delay abstractmethod

get_backend_pid_with_delay(
    delay_seconds: float = 0.01,
) -> int

Return pg_backend_pid() after a brief pg_sleep delay.

acquire_advisory_lock abstractmethod

acquire_advisory_lock(
    lock_id: int, wait: bool = True
) -> bool

Acquire an exclusive advisory lock.

acquire_advisory_lock_shared abstractmethod

acquire_advisory_lock_shared(
    lock_id: int, wait: bool = True
) -> bool

Acquire a shared advisory lock.

release_advisory_lock abstractmethod

release_advisory_lock(lock_id: int) -> bool

Release an exclusive advisory lock.

release_advisory_lock_shared abstractmethod

release_advisory_lock_shared(lock_id: int) -> bool

Release a shared advisory lock.

try_advisory_lock abstractmethod

try_advisory_lock(lock_id: int) -> bool

Attempt a non-blocking exclusive advisory lock.

set_lock_timeout abstractmethod

set_lock_timeout(timeout_ms: int) -> None

Set session-level lock_timeout in milliseconds (0 = unlimited).

set_statement_timeout abstractmethod

set_statement_timeout(timeout_ms: int) -> None

Set session-level statement_timeout in milliseconds (0 = unlimited).

reset_timeouts abstractmethod

reset_timeouts() -> None

Reset both lock_timeout and statement_timeout to unlimited.

execute_aggregate_query abstractmethod

execute_aggregate_query(
    table_name: str,
) -> tuple[int, float, float, float]

Execute COUNT/AVG/MAX/MIN over table_name.

Returns (total_count, avg_price, max_price, min_price).

execute_nonexistent_table_query abstractmethod

execute_nonexistent_table_query() -> None

Execute a query against a missing table (used to trigger CB failures).

execute_timeout_query abstractmethod

execute_timeout_query(
    timeout_ms: int = 1, sleep_seconds: int = 1
) -> None

Execute a query that exceeds statement_timeout (CB testing).

advisory_lock_context abstractmethod

advisory_lock_context(
    lock_id: int, exclusive: bool = True, wait: bool = True
) -> Generator[bool, None, None]

Hold an advisory lock for the duration of a with block.

timeout_context abstractmethod

timeout_context(
    lock_timeout_ms: int = 0, statement_timeout_ms: int = 0
) -> Generator[None, None, None]

Apply session timeouts within a with block, restoring on exit.

create_cursor abstractmethod

create_cursor() -> Any

Create a cursor whose lifecycle is owned by the caller.

Used by pool-exhaustion paths that hold cursors externally (StressTestService._held_connections).

execute_with_cursor abstractmethod

execute_with_cursor(
    cursor: Any, query: str, params: list[Any] | None = None
) -> Any

Execute query on a caller-supplied cursor and return fetchone().

ConnectionStats dataclass

ConnectionStats(
    total_connections: int,
    active: int,
    idle: int,
    idle_in_transaction: int,
)

PostgreSQL connection statistics from pg_stat_activity.

AdvisoryLockResult dataclass

AdvisoryLockResult(
    acquired: bool, lock_id: int, error: str | None = None
)

Result of an advisory lock attempt.

Pool monitoring

PoolInfoProvider

Bases: ABC

Abstract interface for connection-pool stats discovery (OSS dict shape).

Concrete implementations return the existing OSS dict shape with keys: pool_type, pool_size, max_overflow, checkedin, checkedout, overflow, total_capacity, available, pool_exhausted. An empty dict signals "no pool detected".

get_pool_info abstractmethod

get_pool_info() -> dict[str, Any]

Return pool stats as a dict (empty when no pool is reachable).

ConnectionInfo dataclass

ConnectionInfo(
    connection_id: str,
    acquired_at: datetime,
    stack_trace: str | None = None,
    query_info: str | None = None,
    thread_id: int | None = None,
)

Tracked connection information for leak detection.

LeakReport dataclass

LeakReport(
    suspected_leaks: list[ConnectionInfo],
    leak_threshold_seconds: float,
    report_time: datetime,
)

Connection leak detection report.

NoOpPoolStatsProvider

NoOpPoolStatsProvider(pool_name: str = 'noop')

Bases: PoolStatsProvider

Default OSS pool stats provider — returns empty stats.

Registered as ProviderRegistry.pool_monitor default so callers always receive a usable provider (never None). PRO overrides the default with a realized backend when baldur_pro is installed.

PoolHealthStatus

Bases: str, Enum

Connection pool health status.

PoolStats dataclass

PoolStats(
    pool_name: str,
    max_connections: int,
    active_connections: int,
    available_connections: int,
    waiting_requests: int = 0,
    checked_at: datetime = (lambda: utc_now())(),
)

Connection pool statistics.

PoolStatsProvider

Bases: ABC

Abstract interface for pool statistics providers.

Concrete implementations live in baldur.adapters.pool (OSS: in-memory, SQLAlchemy-backed) and in baldur_pro (PRO: realized backends). The PRO ConnectionPoolMonitor consumes any implementation of this ABC.

get_stats abstractmethod

get_stats() -> PoolStats

Get current pool statistics.

ConnectionPoolMonitor

Bases: Protocol

Protocol for the PRO connection-pool monitor.

PRO ships the realized backend; OSS adapters (e.g., PoolWatchdog) hold an injected monitor reference and call its public surface. Methods are Interface Segregation — only those OSS code currently calls are declared.

Quorum

QuorumLease dataclass

QuorumLease(
    region: str,
    acquired_at: float,
    expires_at: float,
    lease_id: str,
)

Quorum lease information.

The region holding the lease acts as the Primary for the duration of the lease.

Attributes:

Name Type Description
region str

Region that holds the lease

acquired_at float

Acquisition time (Unix timestamp)

expires_at float

Expiry time (Unix timestamp)

lease_id str

Lease ID (used for verification on re-acquisition)

region instance-attribute

region: str

Region that holds the lease.

acquired_at instance-attribute

acquired_at: float

Acquisition time (Unix timestamp).

expires_at instance-attribute

expires_at: float

Expiry time (Unix timestamp).

lease_id instance-attribute

lease_id: str

Lease ID (used for verification on re-acquisition).

is_valid

is_valid() -> bool

Check whether the lease is still valid.

QuorumWitnessProtocol

Bases: Protocol

Quorum Witness protocol.

Prevents split-brain during Primary election in a multi-region environment. All implementations must provide the following guarantees:

  1. At most one region can be Primary at any given time (mutual exclusion)
  2. Leases expire automatically based on TTL (liveness)
  3. Only the lease holder can renew or release the lease (fencing)

try_acquire_primary

try_acquire_primary() -> bool

Attempt to acquire the Primary lock.

If this region is already Primary, renews the lease and returns True (idempotent). On renewal failure, resets local state and attempts a new CAS operation.

Returns:

Type Description
bool

True if acquisition succeeded (this region is Primary)

renew_lease

renew_lease() -> bool

Renew the lease.

Returns:

Type Description
bool

True if renewal succeeded

release_lease

release_lease() -> None

Release the lease.

get_current_primary

get_current_primary() -> str | None

Query the current Primary region.

Returns:

Type Description
str | None

Name of the Primary region, or None

is_primary

is_primary() -> bool

Check whether the current region is Primary (local check, hot path).

Applies safety_margin to return False earlier than the actual expiry. Use is_primary_verified() for critical decisions.

Returns:

Type Description
bool

True if Primary

is_primary_verified

is_primary_verified() -> bool

Primary check with server-side cross-validation (cold path).

Queries the backend server to verify that the lease still exists. Involves a network call, so do not use on the hot path.

Returns:

Type Description
bool

True if the lease is also valid on the server

get_lease

get_lease() -> QuorumLease | None

Return the current lease.

start_auto_renew

start_auto_renew() -> None

Start automatic renewal.

stop_auto_renew

stop_auto_renew() -> None

Stop automatic renewal.