Skip to content

Results & introspection

Every enqueue returns a Job — a lightweight handle (just the app plus a task id) you use to fetch the outcome, the current status, or metadata about a task still in flight.

job = await add.enqueue(2, 3)

await job.result(timeout=...) returns a TaskResult once the task finishes.

# poll once: returns the result now, or None if it isn't ready yet
result = await job.result()
# wait up to 5 seconds, raising TimeoutError if it never lands
result = await job.result(timeout=5)
  • Without timeout it returns the stored result immediately, or None if the task hasn’t completed.
  • With timeout (seconds) it polls until the result is stored, raising TimeoutError if it isn’t in time.
TaskResult(success=True, value=5, tries=1,
enqueue_time=..., start=..., finish=...)
FieldDescription
successTrue if the task returned, False if it failed after all retries.
valueThe return value on success; the error’s repr on failure.
triesHow many attempts it took.
enqueue_timeEpoch ms when the task was enqueued.
startEpoch ms when execution started.
finishEpoch ms when execution finished.
duration_msConvenience property: finish - start.
result = await job.result(timeout=5)
if result.success:
print(result.value, f"in {result.duration_ms}ms after {result.tries} tries")
else:
print("failed:", result.value)

await job.status() returns a string describing where the task is:

StatusMeaning
queuedWaiting in a priority lane, ready to run.
scheduledWaiting in the delayed/scheduled set for its time to come.
runningCurrently being executed by a worker.
completeFinished; a result is available (until its TTL expires).
not_foundUnknown id — never enqueued, or aged out.
print(await job.status()) # 'queued'

await job.info() returns a TaskInfo snapshot of an unfinished task (queued, scheduled, or running), or None if it’s finished or unknown. Use result() for finished tasks and info() to introspect what’s still pending.

info = await job.info()
if info is not None:
print(info.fn_name, info.args, info.kwargs)
print("status:", info.status, "tries:", info.tries)
if info.scheduled_at is not None:
print("scheduled for epoch-ms", info.scheduled_at)
FieldDescription
task_idThe job id.
fn_nameRegistered task name.
argsPositional arguments tuple.
kwargsKeyword arguments dict.
enqueue_timeEpoch ms when it was enqueued.
triesAttempts so far.
statusSame values as status() above.
scheduled_atEpoch ms if it’s waiting in the delayed set, else None.

If you have a task id but not the Job, the same calls exist on the app:

await app.result(task_id, timeout=5)
await app.status(task_id)
await app.info(task_id)
await app.queue_size() # number of jobs waiting across lanes