Skip to content

baldur.interfaces — Task Queue

The task-queue contract (sync and async) with its status/priority enums, result and option DTOs, and the queue exception hierarchy. Adapter authors implement these to back Baldur on Celery, RQ, arq, and others.

Enums

TaskStatus

Bases: str, Enum

Task execution status

TaskPriority

Bases: IntEnum

Task priority levels (higher = processed sooner)

DTOs

TaskResult dataclass

TaskResult(
    task_id: str,
    status: TaskStatus,
    result: Any | None = None,
    error: str | None = None,
    traceback: str | None = None,
    retries: int = 0,
    started_at: datetime | None = None,
    completed_at: datetime | None = None,
)

Result of task execution or status check.

Immutable dataclass representing the outcome or current state of a queued task.

Attributes:

Name Type Description
task_id str

Unique task identifier

status TaskStatus

Current task status

result Any | None

Return value if task succeeded

error str | None

Error message if task failed

traceback str | None

Full traceback string if task failed

retries int

Number of retry attempts made

started_at datetime | None

When task execution began

completed_at datetime | None

When task execution completed

is_finished property

is_finished: bool

Check if task has completed (success or failure).

is_successful property

is_successful: bool

Check if task completed successfully.

duration property

duration: timedelta | None

Calculate task execution duration.

TaskOptions dataclass

TaskOptions(
    countdown: int | None = None,
    eta: datetime | None = None,
    expires: datetime | None = None,
    retry: bool = True,
    max_retries: int = 3,
    retry_backoff: bool = True,
    retry_backoff_max: int = 600,
    retry_jitter: bool = True,
    queue: str | None = None,
    priority: TaskPriority = TaskPriority.NORMAL,
    timeout: int | None = None,
    soft_timeout: int | None = None,
)

Options for task enqueueing.

Configures how a task should be executed, including scheduling, retries, and queue selection.

Attributes:

Name Type Description
countdown int | None

Delay in seconds before execution

eta datetime | None

Exact time to execute task

expires datetime | None

Task expiration time (won't run after this)

retry bool

Enable automatic retries on failure

max_retries int

Maximum retry attempts

retry_backoff bool

Use exponential backoff between retries

retry_backoff_max int

Maximum backoff delay in seconds

retry_jitter bool

Add randomness to backoff delays

queue str | None

Target queue name (default uses 'default')

priority TaskPriority

Task priority (higher = processed sooner)

timeout int | None

Task execution timeout in seconds

soft_timeout int | None

Soft timeout (raises SoftTimeLimitExceeded)

with_countdown

with_countdown(seconds: int) -> TaskOptions

Create new options with countdown.

with_priority

with_priority(priority: TaskPriority) -> TaskOptions

Create new options with priority.

ScheduleInfo dataclass

ScheduleInfo(
    schedule_id: str,
    task_name: str,
    interval: timedelta,
    args: tuple = tuple(),
    kwargs: dict = dict(),
    last_run: datetime | None = None,
    next_run: datetime | None = None,
    enabled: bool = True,
)

Information about a periodic schedule.

Attributes:

Name Type Description
schedule_id str

Unique schedule identifier

task_name str

Name of the scheduled task

interval timedelta

Execution interval

args tuple

Positional arguments for task

kwargs dict

Keyword arguments for task

last_run datetime | None

When task last executed

next_run datetime | None

When task will next execute

enabled bool

Whether schedule is active

Exceptions

TaskQueueError

TaskQueueError(message: str = '', *, code: str = '')

Bases: BaldurError

Base exception for task queue errors.

TaskNotFoundError

TaskNotFoundError(message: str = '', *, code: str = '')

Bases: TaskQueueError

Raised when a task is not registered.

TaskTimeoutError

TaskTimeoutError(message: str = '', *, code: str = '')

Bases: TaskQueueError

Raised when task execution times out.

TaskRevokedError

TaskRevokedError(message: str = '', *, code: str = '')

Bases: TaskQueueError

Raised when a revoked task is accessed.

Interfaces

TaskQueueInterface

Bases: ABC

Abstract interface for async task queues.

This interface defines the contract for background task execution systems. It enables the baldur system to work with different task queue backends interchangeably.

Implementations
  • CeleryTaskAdapter (current - Celery)
  • RQTaskAdapter (planned - Redis Queue)
  • DramatiqTaskAdapter (planned)
  • SyncTaskAdapter (for testing - synchronous execution)
Example

queue = ProviderRegistry.get_queue()

Enqueue a task

task_id = queue.enqueue( ... "process_payment", ... args=(order_id,), ... options=TaskOptions(priority=TaskPriority.HIGH), ... )

Check result later

result = queue.get_result(task_id) if result.is_successful: ... print(f"Payment processed: {result.result}")

provider_name abstractmethod property

provider_name: str

Return the provider name.

Returns:

Type Description
str

Provider identifier (e.g., 'celery', 'rq', 'dramatiq')

task abstractmethod

task(
    name: str | None = None,
    bind: bool = False,
    max_retries: int = 3,
    autoretry_for: tuple[type[Exception], ...] = (),
    retry_backoff: bool = True,
    retry_backoff_max: int = 600,
    retry_jitter: bool = True,
    rate_limit: str | None = None,
    time_limit: int | None = None,
    soft_time_limit: int | None = None,
) -> Callable[[F], F]

Decorator to register a function as a task.

Parameters:

Name Type Description Default
name str | None

Task name (default: function qualified name)

None
bind bool

If True, pass task instance as first argument

False
max_retries int

Maximum retry attempts on failure

3
autoretry_for tuple[type[Exception], ...]

Exception types to automatically retry

()
retry_backoff bool

Use exponential backoff between retries

True
retry_backoff_max int

Maximum backoff delay in seconds

600
retry_jitter bool

Add randomness to prevent thundering herd

True
rate_limit str | None

Rate limit (e.g., "10/m" for 10 per minute)

None
time_limit int | None

Hard time limit in seconds

None
soft_time_limit int | None

Soft time limit (raises exception)

None

Returns:

Type Description
Callable[[F], F]

Decorator function

Example

@queue.task(max_retries=5, autoretry_for=(ConnectionError,)) ... def process_payment(payment_id: int): ... # Process the payment ... pass

register_task

register_task(
    func: Callable, name: str | None = None, **options: Any
) -> str

Register a function as a task programmatically.

Parameters:

Name Type Description Default
func Callable

Function to register

required
name str | None

Task name (default: function qualified name)

None
**options Any

Additional task options

{}

Returns:

Type Description
str

Registered task name

enqueue abstractmethod

enqueue(
    task_name: str,
    args: tuple = (),
    kwargs: dict | None = None,
    options: TaskOptions | None = None,
) -> str

Enqueue a task for async execution.

Parameters:

Name Type Description Default
task_name str

Registered task name

required
args tuple

Positional arguments for task

()
kwargs dict | None

Keyword arguments for task

None
options TaskOptions | None

Execution options

None

Returns:

Type Description
str

Task ID for tracking

Raises:

Type Description
TaskNotFoundError

If task_name is not registered

Example

task_id = queue.enqueue( ... "send_notification", ... args=(user_id, "Welcome!"), ... options=TaskOptions(countdown=60), ... )

enqueue_many abstractmethod

enqueue_many(
    tasks: list[tuple[str, tuple, dict]],
    options: TaskOptions | None = None,
) -> list[str]

Enqueue multiple tasks atomically.

Parameters:

Name Type Description Default
tasks list[tuple[str, tuple, dict]]

List of (task_name, args, kwargs) tuples

required
options TaskOptions | None

Shared execution options for all tasks

None

Returns:

Type Description
list[str]

List of task IDs in same order as input

Note

Implementations should ensure either all tasks are enqueued or none are (atomic operation).

delay

delay(task_name: str, *args: Any, **kwargs: Any) -> str

Convenience method to enqueue a task immediately.

Parameters:

Name Type Description Default
task_name str

Registered task name

required
*args Any

Positional arguments for task

()
**kwargs Any

Keyword arguments for task

{}

Returns:

Type Description
str

Task ID for tracking

apply_async

apply_async(
    task_name: str,
    args: tuple = (),
    kwargs: dict | None = None,
    countdown: int | None = None,
    eta: datetime | None = None,
    **extra_options: Any
) -> str

Enqueue a task with common options as keyword arguments.

Parameters:

Name Type Description Default
task_name str

Registered task name

required
args tuple

Positional arguments

()
kwargs dict | None

Keyword arguments

None
countdown int | None

Delay in seconds

None
eta datetime | None

Exact execution time

None
**extra_options Any

Additional TaskOptions fields

{}

Returns:

Type Description
str

Task ID for tracking

get_result abstractmethod

get_result(
    task_id: str, timeout: float | None = None
) -> TaskResult

Get task result (may block if timeout provided).

Parameters:

Name Type Description Default
task_id str

Task ID from enqueue

required
timeout float | None

Max seconds to wait for completion

None

Returns:

Type Description
TaskResult

TaskResult with status and result/error

Note

If timeout is None, returns immediately with current status. If timeout is provided, blocks until task completes or timeout expires.

revoke abstractmethod

revoke(
    task_id: str,
    terminate: bool = False,
    signal: str = "SIGTERM",
) -> bool

Cancel a pending or running task.

Parameters:

Name Type Description Default
task_id str

Task ID to cancel

required
terminate bool

If True, terminate running task

False
signal str

Signal to send if terminating

'SIGTERM'

Returns:

Type Description
bool

True if task was revoked

Note

Revoking a pending task prevents execution. Terminating a running task sends the specified signal.

retry abstractmethod

retry(
    task_id: str,
    countdown: int | None = None,
    max_retries: int | None = None,
) -> str

Retry a failed task.

Parameters:

Name Type Description Default
task_id str

Original task ID

required
countdown int | None

Delay before retry

None
max_retries int | None

Override maximum retries

None

Returns:

Type Description
str

New task ID for the retry

Note

This creates a new task based on the original. The original task's state remains unchanged.

forget

forget(task_id: str) -> bool

Forget a task result (cleanup).

Parameters:

Name Type Description Default
task_id str

Task ID to forget

required

Returns:

Type Description
bool

True if result was forgotten

schedule_periodic abstractmethod

schedule_periodic(
    task_name: str,
    schedule: timedelta,
    args: tuple = (),
    kwargs: dict | None = None,
    name: str | None = None,
) -> str

Schedule a periodic task.

Parameters:

Name Type Description Default
task_name str

Registered task name

required
schedule timedelta

Execution interval

required
args tuple

Positional arguments for task

()
kwargs dict | None

Keyword arguments for task

None
name str | None

Unique schedule name (auto-generated if not provided)

None

Returns:

Type Description
str

Schedule ID

Example

schedule_id = queue.schedule_periodic( ... "cleanup_expired_tokens", ... schedule=timedelta(hours=1), ... )

unschedule abstractmethod

unschedule(schedule_id: str) -> bool

Remove a periodic schedule.

Parameters:

Name Type Description Default
schedule_id str

Schedule ID to remove

required

Returns:

Type Description
bool

True if schedule was removed

get_schedule

get_schedule(schedule_id: str) -> ScheduleInfo | None

Get information about a periodic schedule.

Parameters:

Name Type Description Default
schedule_id str

Schedule ID to query

required

Returns:

Type Description
ScheduleInfo | None

ScheduleInfo or None if not found

list_schedules

list_schedules() -> list[ScheduleInfo]

List all periodic schedules.

Returns:

Type Description
list[ScheduleInfo]

List of ScheduleInfo objects

purge_queue abstractmethod

purge_queue(queue_name: str = 'default') -> int

Remove all pending tasks from a queue.

Parameters:

Name Type Description Default
queue_name str

Queue to purge

'default'

Returns:

Type Description
int

Number of tasks purged

Warning

This permanently removes all pending tasks. Use with caution in production.

queue_length abstractmethod

queue_length(queue_name: str = 'default') -> int

Get number of pending tasks in queue.

Parameters:

Name Type Description Default
queue_name str

Queue to check

'default'

Returns:

Type Description
int

Number of pending tasks

list_queues

list_queues() -> list[str]

List all known queue names.

Returns:

Type Description
list[str]

List of queue names

active_count

active_count() -> int

Get number of currently executing tasks.

Returns:

Type Description
int

Number of active tasks across all workers

health_check abstractmethod

health_check() -> bool

Check if task queue backend is reachable.

Returns:

Type Description
bool

True if broker and backend are healthy

worker_count

worker_count() -> int

Get number of active workers.

Returns:

Type Description
int

Number of workers processing tasks

ping

ping() -> bool

Simple connectivity check.

Returns:

Type Description
bool

True if connection is alive

AsyncTaskQueueInterface

Bases: ABC

Async interface for task queues.

For async-native backends (arq, Taskiq, SAQ). Sync backends (Celery, RQ) use TaskQueueInterface instead.

Implementations
  • ArqTaskAdapter (arq - Redis-based async)
  • AsyncSyncTaskAdapter (for testing - async wrapper over sync)

provider_name abstractmethod property

provider_name: str

Return the provider name (e.g., 'arq', 'taskiq').

task abstractmethod

task(
    name: str | None = None,
    *,
    max_retries: int = 3,
    timeout: int | None = None,
    queue: str | None = None
) -> Callable[
    [Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]
]

Decorator to register an async function as a task.

Registered functions MUST be async (coroutines). Type signature enforces this at type-check time — mypy will reject sync function registration.

enqueue abstractmethod async

enqueue(
    task_name: str,
    args: tuple = (),
    kwargs: dict | None = None,
    options: TaskOptions | None = None,
) -> str

Enqueue a task for async execution. Returns task ID.

enqueue_many abstractmethod async

enqueue_many(
    tasks: list[tuple[str, tuple, dict]],
    options: TaskOptions | None = None,
) -> list[str]

Enqueue multiple tasks. Returns list of task IDs.

Raises:

Type Description
PartialEnqueueError

Implementations may raise this when some tasks fail. Contains succeeded (index, ID) pairs and failed (index, exception) pairs for caller-side recovery.

delay async

delay(task_name: str, *args: Any, **kwargs: Any) -> str

Convenience: enqueue immediately with positional/keyword args.

get_result abstractmethod async

get_result(
    task_id: str, timeout: float | None = None
) -> TaskResult

Get task result. Non-blocking if timeout is None.

revoke abstractmethod async

revoke(task_id: str) -> bool

Cancel a pending task. Returns True if revoked.

queue_length abstractmethod async

queue_length(queue_name: str = 'default') -> int

Get number of pending tasks in queue.

health_check abstractmethod async

health_check() -> bool

Check if task queue backend is reachable.

schedule_periodic async

schedule_periodic(
    task_name: str,
    cron: str | None = None,
    interval: timedelta | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
) -> str

Schedule a periodic task.

Not all async backends support this natively. arq supports cron-based scheduling; others may require external schedulers (APScheduler, K8s CronJob).

Parameters:

Name Type Description Default
cron str | None

Cron expression (e.g., "/5 * * * ")

None
interval timedelta | None

Alternative to cron — fixed interval

None

unschedule async

unschedule(schedule_id: str) -> bool

Remove a periodic schedule.

startup async

startup() -> None

Initialize connections (called once at app startup).

shutdown async

shutdown() -> None

Close connections (called once at app shutdown).