Skip to content

baldur.adapters.sql — Framework-Free SQL Adapter (DB-API 2.0)

Generic repository base plus priority-1 SQL-backed implementations of Baldur's core repositories. Works with any DB-API 2.0 driver (psycopg2, mysql-connector-python, stdlib sqlite3) — selected by DSN scheme.

sql

Framework-free SQL adapter (DB-API 2.0).

Provides a generic repository base plus priority-1 SQL-backed implementations of Baldur's core repositories. Works with any DB-API 2.0 driver — psycopg2, mysql-connector-python, or the stdlib sqlite3 — selected by DSN scheme.

Status: Public

GenericSQLRepository

GenericSQLRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None,
    schema: (
        tuple[str, int, Callable[[SQLDialect], list[str]]]
        | None
    ) = None
)

DB-API 2.0 repository helpers.

Subclasses inherit this class plus a domain-specific ABC (FailedOperationRepository etc.). The base exposes helpers only — no ABC method has a default implementation here, so there is no MRO ambiguity.

Connection ownership: get_connection is a user-supplied callable. Baldur does not own a pool. Each helper borrows a connection via the callable and relies on the callable's close() / return-to-pool semantics. Common implementations:

  • get_connection = lambda: psycopg2.connect(DSN) — direct.
  • get_connection = engine.raw_connection — SQLAlchemy pool.
  • get_connection = lambda: pgbouncer_pool.getconn() — external pooler.

SchemaVersionManager

SchemaVersionManager(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect
)

Owns the baldur_schema_version bookkeeping table.

Repos call ensure(repo_name, version, ddl_statements) during first use. DDL runs exactly once per (repo_name, version) pair per database — subsequent calls are no-ops.

SQLCascadeEventArchiveRepository

SQLCascadeEventArchiveRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, CascadeEventArchiveRepository

DB-API 2.0 backed cascade event archive repository.

SQLCircuitBreakerStateRepository

SQLCircuitBreakerStateRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, CircuitBreakerStateRepository

DB-API 2.0 backed circuit-breaker state repository.

try_acquire_half_open_slot

try_acquire_half_open_slot(
    service_name: str,
    limit: int,
    stuck_timeout_seconds: int,
) -> tuple[bool, str, str]

Atomic HALF_OPEN slot acquisition.

Uses SELECT ... FOR UPDATE NOWAIT (PostgreSQL/MySQL 8+) to serialize concurrent acquires; on lock contention, fails open with (False, current_state, current_state). SQLite falls back to its implicit single-writer model (no NOWAIT clause).

reset_half_open_count

reset_half_open_count(service_name: str) -> None

Reset HALF_OPEN counter and clear window watermark.

record_success_with_close_check

record_success_with_close_check(
    service_name, success_threshold
)

Atomic HALF_OPEN -> CLOSED close-check via SELECT FOR UPDATE NOWAIT.

Mirrors try_acquire_half_open_slot's SQL pattern. Concurrent transactions on the same row are serialized by the row-level lock; on NOWAIT lock contention the driver exception is re-raised so the Layered wrapper records the degraded-mode metric and delegates to L1.

Branches mirror the Redis Lua: - state='half_open': increment success_count; transition to CLOSED + reset counters/watermarks when the threshold is crossed (did_close=True). Otherwise persist the increment. - state='closed': race-loser / post-crash convergence -- no write, return (did_close=False, state='closed', count=0). - state in {open, missing, unknown}: stale relative to the caller's HALF_OPEN expectation; the wrapper's stale-L2 guard falls back to L1's atomic close path.

SQLEventJournalRepository

SQLEventJournalRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None,
    max_query_limit: int = 10000
)

Bases: GenericSQLRepository, EventJournalRepository

DB-API 2.0 backed append-only event journal.

SQLFailedOperationRepository

SQLFailedOperationRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, FailedOperationRepository

DB-API 2.0 backed DLQ repository.

count_created_in_window

count_created_in_window(
    start: datetime, end: datetime
) -> int

Count rows whose created_at is in the inclusive [start, end].

Backed by idx_baldur_dlq_created_at — a range seek, not a scan.

get_facet_counts

get_facet_counts(
    *, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]

Faceted status×domain counts via GROUP BY.

by_status is scoped by domain; by_domain is scoped by status. GROUP BY drops zero-count buckets structurally. The domain-scoped by_status (WHERE domain GROUP BY status) is a covering scan over idx_baldur_dlq_status_domain; the status-scoped by_domain (WHERE status GROUP BY domain) is a prefix seek on the same composite. Both exact, both fine on the cold operator read path.

try_acquire_for_replay

try_acquire_for_replay(
    id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None

Atomically flip PENDING → REPLAYING if retry budget remains.

Uses a conditional UPDATE as the concurrency guard. cursor.rowcount

0 means this worker won the race; the row is then re-read within the same transaction so no other writer can delete or mutate it between the claim and the DTO return.

force=True is the operator cap-override: it widens the WHERE status set to {PENDING, REQUIRES_REVIEW}, drops the retry_count < max_retries bound, resets retry_count to a fresh budget (1), and stamps the metadata history scar into the JSON payload — all inside the same transaction so the SELECT→UPDATE claim stays race-free against a concurrent sweep. See FailedOperationRepository.try_acquire_for_replay.

SQLPostmortemRepository

SQLPostmortemRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, PostmortemRepository

DB-API 2.0 backed postmortem repository.

SQLRecoverySessionArchiveRepository

SQLRecoverySessionArchiveRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, RecoverySessionArchiveRepository

DB-API 2.0 backed recovery session archive repository.

SQLSecurityIncidentRepository

SQLSecurityIncidentRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, SecurityIncidentRepository

DB-API 2.0 backed security incident repository.

SQLStatisticsRepository

SQLStatisticsRepository(
    get_connection: Callable[[], Any],
    *,
    dialect: SQLDialect | None = None,
    autocommit_delegated: bool | None = None
)

Bases: GenericSQLRepository, StatisticsRepositoryInterface

DB-API 2.0 backed statistics repository.

Reads from baldur_dlq and baldur_cb_state tables directly. Does not own a table — schema bootstrap is skipped.

sql_transaction

sql_transaction(conn: Any) -> Any

Suspend repo-scoped auto-commit for the duration of the block.

Usage::

with sql_transaction(conn):
    dlq_repo.save(...)
    cb_repo.update(...)
# single commit (or rollback on exception) applies to both.

All repositories whose get_connection returns conn during the block skip their per-call commit. The context manager itself issues the final commit, or rollback on exception.