W3docs

Python asyncio: async and await

Learn Python asyncio from scratch: coroutines, the event loop, tasks, gather, timeouts, and queues — with runnable examples and clear explanations.

Python's asyncio module lets you write concurrent code in a single thread using the async and await keywords. Instead of blocking while waiting for network responses or file reads, an asyncio program suspends the waiting task and immediately switches to other work — resuming when the result is ready. This makes it the right tool for I/O-bound programs such as web scrapers, API clients, and chat servers.

This chapter covers:

  • What async functions (coroutines) are and how they differ from regular functions
  • The event loop and how asyncio schedules work
  • Awaiting results, running tasks concurrently with asyncio.gather and asyncio.create_task
  • Handling exceptions and timeouts inside async code
  • The asyncio.Queue for producer-consumer patterns
  • When to use asyncio and when to reach for threading instead

Why asyncio exists

Consider a program that calls two APIs one after another:

import time

def fetch(name, delay):
    time.sleep(delay)          # blocks the whole program
    return f'data from {name}'

start = time.perf_counter()
r1 = fetch('API A', 1)
r2 = fetch('API B', 1)
print(f'Done in {time.perf_counter() - start:.1f}s')
# Done in 2.0s

Both calls run sequentially — 2 seconds total even though each call only needs 1 second of waiting. With asyncio the program pauses fetch('API A', ...) while it waits, starts fetch('API B', ...) immediately, and both finish in about 1 second:

import asyncio
import time

async def fetch(name, delay):
    await asyncio.sleep(delay)   # suspends only this coroutine
    return f'data from {name}'

async def main():
    start = time.perf_counter()
    r1, r2 = await asyncio.gather(fetch('API A', 1), fetch('API B', 1))
    print(f'Done in {time.perf_counter() - start:.1f}s')
    # Done in 1.0s

asyncio.run(main())

Coroutines: async def and await

A function defined with async def is called a coroutine function. Calling it does not run the body immediately — it returns a coroutine object that must be driven by the event loop.

async def greet(name):
    print(f'Hello, {name}!')

# Calling it returns a coroutine object, nothing is printed yet
coro = greet('World')
print(type(coro))   # <class 'coroutine'>

# Run it properly
import asyncio
asyncio.run(greet('World'))
# Hello, World!

Inside a coroutine, await suspends execution until the awaitable (another coroutine, a Task, or a Future) produces a result. The event loop is free to run other coroutines while one is suspended.

import asyncio

async def step_one():
    print('Step 1: start')
    await asyncio.sleep(1)     # suspend for 1 second
    print('Step 1: end')
    return 'result-1'

async def main():
    value = await step_one()   # wait for step_one to finish
    print(value)

asyncio.run(main())
# Step 1: start
# Step 1: end
# result-1

What you can await

  • Another async def coroutine
  • An asyncio.Task (created with asyncio.create_task)
  • An asyncio.Future
  • Any object with an __await__ method

You cannot use await outside an async def function.

The event loop

The event loop is asyncio's scheduler. It keeps a queue of coroutines and tasks, runs each one until it hits an await, then switches to the next ready item. There is typically one event loop per thread.

asyncio.run(coro) is the standard entry point for asyncio programs. It creates a new event loop, runs the given coroutine to completion, closes the loop, and returns the result:

import asyncio

async def compute():
    await asyncio.sleep(0)   # yield control once
    return 6 * 7

result = asyncio.run(compute())
print(result)   # 42

For most applications you never need to manage the loop directly — asyncio.run handles creation and teardown.

Running tasks concurrently

asyncio.gather

asyncio.gather(*coroutines) schedules all the supplied coroutines to run concurrently and returns their results in the same order:

import asyncio

async def fetch_data(name, delay):
    print(f'Start fetching {name}')
    await asyncio.sleep(delay)
    print(f'Done fetching {name}')
    return f'data from {name}'

async def main():
    results = await asyncio.gather(
        fetch_data('API A', 1),
        fetch_data('API B', 2),
        fetch_data('API C', 1),
    )
    print(results)

asyncio.run(main())
# Start fetching API A
# Start fetching API B
# Start fetching API C
# Done fetching API A
# Done fetching API C
# Done fetching API B
# ['data from API A', 'data from API B', 'data from API C']

All three coroutines start immediately. The total elapsed time matches the slowest coroutine (2 s), not the sum (4 s).

asyncio.create_task

asyncio.create_task(coro) wraps a coroutine in a Task and schedules it to run soon. Unlike gather, creating a task fires it off in the background while the current coroutine keeps running:

import asyncio

async def background_job(name, delay):
    print(f'{name}: start')
    await asyncio.sleep(delay)
    print(f'{name}: end')
    return f'{name} done'

async def main():
    t1 = asyncio.create_task(background_job('Task A', 1))
    t2 = asyncio.create_task(background_job('Task B', 2))

    # Both tasks are already scheduled; await collects their results
    result1 = await t1
    result2 = await t2
    print(result1, result2)

asyncio.run(main())
# Task A: start
# Task B: start
# Task A: end
# Task B: end
# Task A done Task B done

Use create_task when you want a task to start immediately and you plan to collect its result (or cancel it) later. Use gather when you want to launch a fixed group of coroutines and wait for all of them together.

Interleaved output

A useful way to see the event loop in action is to watch how tasks interleave:

import asyncio

async def count_down(name, seconds):
    for i in range(seconds, 0, -1):
        print(f'{name}: {i}')
        await asyncio.sleep(1)
    print(f'{name}: done!')

async def main():
    await asyncio.gather(
        count_down('Task A', 3),
        count_down('Task B', 2),
    )

asyncio.run(main())
# Task A: 3
# Task B: 2
# Task A: 2
# Task B: 1
# Task A: 1
# Task B: done!
# Task A: done!

Both tasks share one thread; the event loop alternates between them at each await asyncio.sleep(1).

Handling exceptions

Exceptions raised inside a coroutine propagate through await just like in synchronous code. Use a regular try/except block:

import asyncio

async def risky_task():
    await asyncio.sleep(0.1)
    raise ValueError('something went wrong')

async def main():
    try:
        await risky_task()
    except ValueError as e:
        print(f'Caught: {e}')

asyncio.run(main())
# Caught: something went wrong

When using asyncio.gather, if one coroutine raises an exception the others are not cancelled by default, but the exception is re-raised when you await the gather call. Pass return_exceptions=True to collect exceptions as return values instead:

import asyncio

async def good():
    return 'ok'

async def bad():
    raise RuntimeError('oops')

async def main():
    results = await asyncio.gather(good(), bad(), return_exceptions=True)
    for r in results:
        if isinstance(r, Exception):
            print(f'Error: {r}')
        else:
            print(f'Result: {r}')

asyncio.run(main())
# Result: ok
# Error: oops

Timeouts with asyncio.wait_for

asyncio.wait_for(coro, timeout) runs a coroutine and cancels it if it does not finish within the given number of seconds, raising asyncio.TimeoutError:

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return 42

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        print(result)
    except asyncio.TimeoutError:
        print('Timed out — operation cancelled')

asyncio.run(main())
# Timed out — operation cancelled

This is important for production network code where a hung server would otherwise block a task indefinitely.

asyncio.Queue for producer-consumer patterns

asyncio.Queue is a thread-safe, async-aware queue. It is ideal for decoupling producers (code that generates work) from consumers (code that processes it):

import asyncio

async def producer(queue):
    for i in range(1, 4):
        print(f'Produced item {i}')
        await queue.put(i)
        await asyncio.sleep(0.1)
    await queue.put(None)   # sentinel to signal consumers to stop

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f'Consumed item {item}')

async def main():
    q = asyncio.Queue()
    await asyncio.gather(producer(q), consumer(q))

asyncio.run(main())
# Produced item 1
# Consumed item 1
# Produced item 2
# Consumed item 2
# Produced item 3
# Consumed item 3

For multiple consumers, use queue.task_done() and queue.join() to know when all items have been processed.

asyncio vs threading

Both asyncio and Python's threading module allow work to proceed concurrently, but they do so differently:

asynciothreading
Concurrency modelCooperative (coroutines yield at await)Preemptive (OS switches threads)
Best forMany I/O-bound tasks (network, disk)I/O-bound tasks that use blocking libraries
CPU-bound workNot helpful — still one threadNot helpful — GIL limits true parallelism
OverheadVery low (no OS threads)Higher (each thread uses OS resources)
Shared stateSafe within one event loopRequires locks to avoid data races

Use asyncio when you control the I/O code and can use async-compatible libraries (e.g., aiohttp, asyncpg). Use threading when you rely on third-party blocking libraries that cannot be made async.

For true CPU parallelism, reach for multiprocessing or concurrent.futures.ProcessPoolExecutor instead.

Common gotchas

Forgetting await: Calling an async function without await returns a coroutine object and does nothing. Python issues a RuntimeWarning: coroutine '...' was never awaited to help catch this.

async def main():
    asyncio.sleep(1)   # BUG: returns a coroutine, does not sleep
    await asyncio.sleep(1)   # correct

Blocking the event loop: Running slow synchronous code (a tight loop, a blocking network call, time.sleep) inside a coroutine freezes the entire event loop. Wrap blocking calls with asyncio.to_thread (Python 3.9+) to run them in a thread pool without blocking:

import asyncio
import time

def blocking_task():
    time.sleep(2)   # simulates a slow blocking operation
    return 'done'

async def main():
    result = await asyncio.to_thread(blocking_task)
    print(result)

asyncio.run(main())
# done

Using asyncio.run inside a running loop: Jupyter notebooks already run an event loop. Use await coro directly in notebook cells, or install nest_asyncio to allow nested loops.

Quick-reference summary

PatternWhen to use it
asyncio.run(main())Start the event loop from synchronous code
await coroRun a coroutine and wait for its result
asyncio.gather(*coros)Run multiple coroutines concurrently, collect all results
asyncio.create_task(coro)Schedule a coroutine as a background Task
asyncio.wait_for(coro, timeout=N)Add a deadline to a coroutine
asyncio.QueueDecouple producers from consumers
asyncio.to_thread(fn)Run a blocking function without freezing the loop

Practice

Practice
What does 'await asyncio.sleep(1)' do inside a coroutine?
What does 'await asyncio.sleep(1)' do inside a coroutine?
Was this page helpful?