Skip to content

Python API

Everything public is importable from the top-level package:

from ardiq import Ardiq, Task, Job, TaskResult, TaskInfo

The app: owns the Rust core, the task registry, and the wire codec.

Ardiq(
redis_url: str | None = None,
queue_name: str = "default",
priorities: list[str] | None = None,
*,
serializer: Callable[[Any], bytes] | None = None,
deserializer: Callable[[bytes], Any] | None = None,
cron_poll_s: float = 1.0,
**core_kwargs: Any,
)

Constructor arguments are documented in Configuration (core_kwargs covers concurrency, prefetch, idle_timeout_ms, result_ttl_ms, burst).

PropertyTypeDescription
worker_idstrThis worker’s id (set by the core).
burstboolRead/write; when True the loop exits once the queue drains.
taskslist[str]Names of the registered tasks.

Decorator that registers a task and returns a Task.

def task(
fn: Callable[..., Any] | None = None,
*,
name: str | None = None,
max_retries: int = 3,
backoff_ms: int = 0,
timeout: float | None = None,
priority: str | None = None,
) -> Task

Usable bare (@app.task) or called (@app.task(max_retries=5)). See Defining tasks.

Register a recurring task and return a Task; see Recurring tasks.

def cron(
spec: str | None = None,
*,
every: timedelta | float | None = None,
name: str | None = None,
max_retries: int = 3,
backoff_ms: int = 0,
timeout: float | None = None,
priority: str | None = None,
) -> Callable[..., Task]

Pass exactly one of spec (a 5-field cron expression, UTC) or every (seconds or a timedelta).

MethodReturnsDescription
await run()NoneStart the worker loop; runs until stop() or (in burst) the queue drains.
stop()NoneAsk the loop to shut down gracefully.
await queue_size()intNumber of jobs waiting across lanes.
await result(task_id, timeout=None)TaskResult | NoneFetch a result; with timeout (s) waits, else returns now-or-None.
await status(task_id)strqueued / scheduled / running / complete / not_found.
await info(task_id)TaskInfo | NoneSnapshot of an unfinished task, else None.

A registered task, returned by @app.task. Call it to run inline; use its async methods to dispatch.

MemberDescription
nameThe registered name.
fnThe underlying function.
priorityThe task’s default priority lane (or None).
task(*args, **kwargs)Calling the Task runs fn inline, bypassing the queue.
await enqueue(*args, **kwargs)Dispatch to a worker; returns a Job.
options(...)Returns a bound task with per-call overrides; see below.
def options(
*,
task_id: str | None = None,
priority: str | None = None,
delay_ms: int = 0,
schedule_ms: int = 0,
expire_ms: int = 0,
) -> _BoundTask

Returns an object with the same await enqueue(*args, **kwargs) method, carrying the overrides. See Enqueuing & scheduling.

await add.options(priority="high", delay_ms=5000).enqueue(2, 3)

An immutable handle to an enqueued task — just the app plus an id.

MemberReturnsDescription
appArdiqThe owning app.
idstrThe job id.
await result(timeout=None)TaskResult | NoneFetch the result; with timeout (s) waits, raising TimeoutError.
await status()strCurrent status.
await info()TaskInfo | NoneSnapshot if unfinished, else None.

A NamedTuple describing a finished task.

FieldTypeDescription
successboolWhether the task returned (vs failed after retries).
valueAnyReturn value on success; error repr on failure.
triesintNumber of attempts.
enqueue_timeintEpoch ms when enqueued.
startintEpoch ms when execution started.
finishintEpoch ms when execution finished.
duration_msintProperty: finish - start.

A NamedTuple snapshot of an unfinished task (queued, scheduled, or running).

FieldTypeDescription
task_idstrThe job id.
fn_namestrRegistered task name.
argstuplePositional arguments.
kwargsdictKeyword arguments.
enqueue_timeintEpoch ms when enqueued.
triesintAttempts so far.
statusstrCurrent status.
scheduled_atint | NoneEpoch ms if waiting in the delayed set, else None.