Skip to content

Enqueuing & scheduling

Calling await task.enqueue(*args, **kwargs) serializes the arguments, pushes the job onto Redis, and returns a Job handle you can use to read status and results.

job = await add.enqueue(2, 3)
print(job.id) # a uuid hex, unless you set one yourself

Enqueuing is async because the round-trip to Redis is async — call it from within an event loop.

For one-off overrides, chain .options(...) before .enqueue(...):

await add.options(priority="high", delay_ms=5000).enqueue(2, 3)

.options(...) accepts:

OptionTypeDefaultDescription
task_idstr | Nonea uuidSet your own job id — also used for deduplication.
prioritystr | Nonetask’s defaultOverride the priority lane for this call.
delay_msint0Wait this many ms from now before the task becomes runnable.
schedule_msint0Run at this absolute epoch-ms timestamp.
expire_msint0Drop the job if it hasn’t started within this window.

Run something after a relative delay:

# fire in 30 seconds
await reminder.options(delay_ms=30_000).enqueue(user_id)

Run something at a specific wall-clock time, using an absolute timestamp in epoch ms:

import time
run_at = int(time.time() * 1000) + 3_600_000 # one hour from now
await digest.options(schedule_ms=run_at).enqueue()

While a job is waiting in the delayed/scheduled set, its status is scheduled.

Setting task_id lets you control the job id — useful to make an enqueue idempotent: the same id won’t create a duplicate job.

await sync_account.options(task_id=f"sync:{account_id}").enqueue(account_id)

expire_ms drops a job that has been waiting too long to start — useful for work that’s worthless if it’s stale:

# if no worker picks it up within 60s, forget it
await notify.options(expire_ms=60_000).enqueue(user_id)

Higher-priority lanes are drained first. Define the lanes (lowest-first) on the app, then target one per task or per call:

app = Ardiq(priorities=["low", "default", "high"])
await report.options(priority="low").enqueue() # batch work
await alert.options(priority="high").enqueue() # jump the queue

See Defining tasks for setting a task’s default lane.

.enqueue(...) returns immediately with a Job. To get the outcome, see Results & introspection:

job = await add.enqueue(2, 3)
result = await job.result(timeout=5) # waits up to 5s
print(result.value) # 5