Skip to content

Defining tasks

A task is a Python function registered with an Ardiq app via the @app.task decorator. Registering it returns a Task object you can .enqueue(...) or call inline.

from ardiq import Ardiq
app = Ardiq(queue_name="emails")
@app.task()
async def send_welcome(user_id: int) -> None:
...

ArdiQ decides how to run a task at registration time, based on whether it’s a coroutine function:

  • async def tasks run directly on the worker’s event loop.
  • def (sync) tasks are run in a thread pool (asyncio.to_thread), so a blocking call never freezes the loop or the rest of the worker’s concurrency.
@app.task()
async def fetch(url: str) -> str: # runs on the loop
...
@app.task()
def resize_image(path: str) -> str: # runs in a thread
...

@app.task(...) accepts:

OptionTypeDefaultDescription
namestrfunction nameThe name used on the wire and in the registry. Required if the callable has no __name__.
max_retriesint3How many times to retry after the first attempt fails.
backoff_msint0Delay between retries in ms. 0 uses the core’s default backoff.
timeoutfloat | NoneNonePer-task timeout in seconds. A task that exceeds it fails (and may retry).
prioritystr | NoneNoneDefault priority lane for this task (see Priorities).
@app.task(name="email.welcome", max_retries=5, backoff_ms=2000, timeout=30)
async def send_welcome(user_id: int) -> None:
...

When a task raises, ArdiQ retries it up to max_retries times before recording a failure. Each attempt increments tries (visible on the TaskResult).

@app.task(max_retries=3, backoff_ms=1000)
async def charge(order_id: int) -> None:
# raises on a transient error → retried up to 3 times, 1s apart (then backoff grows)
...

A task that still fails after its last retry stores a failed TaskResult whose value is the error’s repr.

A timeout (in seconds) caps how long a single attempt may run. If it’s exceeded the attempt is cancelled and treated as a failure — so it follows the same retry rules:

@app.task(timeout=10, max_retries=2)
async def call_flaky_api() -> dict:
...

The failed result’s value reads timed out after 10s.

An app is created with a list of priority lanes, lowest-first:

app = Ardiq(priorities=["low", "default", "high"])

Higher-priority lanes are consumed first. A task can declare a default lane, and any individual enqueue can override it:

@app.task(priority="high")
async def urgent(...): ...
# override per call
await urgent.options(priority="low").enqueue(...)

See Enqueuing & scheduling for per-call overrides.

To run a task on a schedule instead of on demand, register it with @app.cron (a cron expression or an every= interval) — see Recurring tasks.

A registered task is still a normal callable — calling it runs the function directly, bypassing the queue entirely. This is handy in tests:

result = await add(2, 3) # runs now, in-process; no Redis involved

To actually dispatch it to a worker, use .enqueue(...).