Enqueuing & scheduling
Calling await task.enqueue(*args, **kwargs) serializes the arguments, pushes the job onto
Redis, and returns a Job handle you can use to read status and
results.
job = await add.enqueue(2, 3)print(job.id) # a uuid hex, unless you set one yourselfEnqueuing is async because the round-trip to Redis is async — call it from within an event loop.
Per-call options
Section titled “Per-call options”For one-off overrides, chain .options(...) before .enqueue(...):
await add.options(priority="high", delay_ms=5000).enqueue(2, 3).options(...) accepts:
| Option | Type | Default | Description |
|---|---|---|---|
task_id | str | None | a uuid | Set your own job id — also used for deduplication. |
priority | str | None | task’s default | Override the priority lane for this call. |
delay_ms | int | 0 | Wait this many ms from now before the task becomes runnable. |
schedule_ms | int | 0 | Run at this absolute epoch-ms timestamp. |
expire_ms | int | 0 | Drop the job if it hasn’t started within this window. |
Delayed tasks
Section titled “Delayed tasks”Run something after a relative delay:
# fire in 30 secondsawait reminder.options(delay_ms=30_000).enqueue(user_id)Scheduled tasks
Section titled “Scheduled tasks”Run something at a specific wall-clock time, using an absolute timestamp in epoch ms:
import time
run_at = int(time.time() * 1000) + 3_600_000 # one hour from nowawait digest.options(schedule_ms=run_at).enqueue()While a job is waiting in the delayed/scheduled set, its status is
scheduled.
Custom ids & deduplication
Section titled “Custom ids & deduplication”Setting task_id lets you control the job id — useful to make an enqueue idempotent: the
same id won’t create a duplicate job.
await sync_account.options(task_id=f"sync:{account_id}").enqueue(account_id)Expiry
Section titled “Expiry”expire_ms drops a job that has been waiting too long to start — useful for work that’s
worthless if it’s stale:
# if no worker picks it up within 60s, forget itawait notify.options(expire_ms=60_000).enqueue(user_id)Priorities
Section titled “Priorities”Higher-priority lanes are drained first. Define the lanes (lowest-first) on the app, then target one per task or per call:
app = Ardiq(priorities=["low", "default", "high"])
await report.options(priority="low").enqueue() # batch workawait alert.options(priority="high").enqueue() # jump the queueSee Defining tasks for setting a task’s default lane.
Reading the result
Section titled “Reading the result”.enqueue(...) returns immediately with a Job. To get the outcome, see
Results & introspection:
job = await add.enqueue(2, 3)result = await job.result(timeout=5) # waits up to 5sprint(result.value) # 5