Skip to content

Getting started

  • Python 3.13+
  • A Redis server (ArdiQ uses Redis streams as its broker and result store).
Terminal window
$ pip install ardiq

ArdiQ ships as a prebuilt wheel with the Rust core baked in — no Rust toolchain needed to use it. The base package is the library and has a single runtime dependency (msgpack): enough to define tasks, enqueue, and run a worker from your own code.

The ardiq worker command used below ships in the cli extra:

Terminal window
$ pip install 'ardiq[cli]'

You also need a Redis server — the quickest way is Docker:

Terminal window
$ docker run -d --name ardiq-redis -p 6379:6379 redis # Redis on localhost:6379

or install it from your package manager (or redis.io).

  1. Define an app and some tasks in a module — say example.py:

    example.py
    from ardiq import Ardiq
    app = Ardiq(redis_url="redis://localhost:6379", queue_name="example")
    @app.task()
    async def add(a: int, b: int) -> int:
    return a + b
    @app.task(max_retries=3)
    def slow_double(x: int) -> int: # sync task — runs in a thread
    return x * 2
  2. Start a worker that loads app from example.py:

    Terminal window
    $ ardiq run example:app
  3. Enqueue tasks from anywhere — a web handler, a script, a REPL — and read their results:

    import asyncio
    from example import add
    async def main():
    job = await add.enqueue(2, 3) # returns a Job handle
    print(job.id)
    print(await job.status()) # 'queued' | 'running' | 'complete'
    print(await job.result(timeout=5)) # waits → TaskResult(success=True, value=5, tries=1)
    asyncio.run(main())

You don’t need a separate worker to try things out. Burst mode drains the queue and exits, so you can enqueue and process in a single script:

example.py
import asyncio
from ardiq import Ardiq
app = Ardiq(redis_url="redis://localhost:6379", queue_name="example")
@app.task()
async def add(a: int, b: int) -> int:
return a + b
async def main() -> None:
jobs = [await add.enqueue(i, i) for i in range(3)]
app.burst = True
await app.run() # process everything queued, then exit
for job in jobs:
print(await job.result())
if __name__ == "__main__":
asyncio.run(main())
Terminal window
$ python example.py