Python asyncio: async and await
Learn Python asyncio from scratch: coroutines, the event loop, tasks, gather, timeouts, and queues — with runnable examples and clear explanations.
Python's asyncio module lets you write concurrent code in a single thread using the async and await keywords. Instead of blocking while waiting for network responses or file reads, an asyncio program suspends the waiting task and immediately switches to other work — resuming when the result is ready. This makes it the right tool for I/O-bound programs such as web scrapers, API clients, and chat servers.
This chapter covers:
- What
asyncfunctions (coroutines) are and how they differ from regular functions - The event loop and how asyncio schedules work
- Awaiting results, running tasks concurrently with
asyncio.gatherandasyncio.create_task - Handling exceptions and timeouts inside async code
- The
asyncio.Queuefor producer-consumer patterns - When to use asyncio and when to reach for threading instead
Why asyncio exists
Consider a program that calls two APIs one after another:
import time
def fetch(name, delay):
time.sleep(delay) # blocks the whole program
return f'data from {name}'
start = time.perf_counter()
r1 = fetch('API A', 1)
r2 = fetch('API B', 1)
print(f'Done in {time.perf_counter() - start:.1f}s')
# Done in 2.0sBoth calls run sequentially — 2 seconds total even though each call only needs 1 second of waiting. With asyncio the program pauses fetch('API A', ...) while it waits, starts fetch('API B', ...) immediately, and both finish in about 1 second:
import asyncio
import time
async def fetch(name, delay):
await asyncio.sleep(delay) # suspends only this coroutine
return f'data from {name}'
async def main():
start = time.perf_counter()
r1, r2 = await asyncio.gather(fetch('API A', 1), fetch('API B', 1))
print(f'Done in {time.perf_counter() - start:.1f}s')
# Done in 1.0s
asyncio.run(main())Coroutines: async def and await
A function defined with async def is called a coroutine function. Calling it does not run the body immediately — it returns a coroutine object that must be driven by the event loop.
async def greet(name):
print(f'Hello, {name}!')
# Calling it returns a coroutine object, nothing is printed yet
coro = greet('World')
print(type(coro)) # <class 'coroutine'>
# Run it properly
import asyncio
asyncio.run(greet('World'))
# Hello, World!Inside a coroutine, await suspends execution until the awaitable (another coroutine, a Task, or a Future) produces a result. The event loop is free to run other coroutines while one is suspended.
import asyncio
async def step_one():
print('Step 1: start')
await asyncio.sleep(1) # suspend for 1 second
print('Step 1: end')
return 'result-1'
async def main():
value = await step_one() # wait for step_one to finish
print(value)
asyncio.run(main())
# Step 1: start
# Step 1: end
# result-1What you can await
- Another
async defcoroutine - An
asyncio.Task(created withasyncio.create_task) - An
asyncio.Future - Any object with an
__await__method
You cannot use await outside an async def function.
The event loop
The event loop is asyncio's scheduler. It keeps a queue of coroutines and tasks, runs each one until it hits an await, then switches to the next ready item. There is typically one event loop per thread.
asyncio.run(coro) is the standard entry point for asyncio programs. It creates a new event loop, runs the given coroutine to completion, closes the loop, and returns the result:
import asyncio
async def compute():
await asyncio.sleep(0) # yield control once
return 6 * 7
result = asyncio.run(compute())
print(result) # 42For most applications you never need to manage the loop directly — asyncio.run handles creation and teardown.
Running tasks concurrently
asyncio.gather
asyncio.gather(*coroutines) schedules all the supplied coroutines to run concurrently and returns their results in the same order:
import asyncio
async def fetch_data(name, delay):
print(f'Start fetching {name}')
await asyncio.sleep(delay)
print(f'Done fetching {name}')
return f'data from {name}'
async def main():
results = await asyncio.gather(
fetch_data('API A', 1),
fetch_data('API B', 2),
fetch_data('API C', 1),
)
print(results)
asyncio.run(main())
# Start fetching API A
# Start fetching API B
# Start fetching API C
# Done fetching API A
# Done fetching API C
# Done fetching API B
# ['data from API A', 'data from API B', 'data from API C']All three coroutines start immediately. The total elapsed time matches the slowest coroutine (2 s), not the sum (4 s).
asyncio.create_task
asyncio.create_task(coro) wraps a coroutine in a Task and schedules it to run soon. Unlike gather, creating a task fires it off in the background while the current coroutine keeps running:
import asyncio
async def background_job(name, delay):
print(f'{name}: start')
await asyncio.sleep(delay)
print(f'{name}: end')
return f'{name} done'
async def main():
t1 = asyncio.create_task(background_job('Task A', 1))
t2 = asyncio.create_task(background_job('Task B', 2))
# Both tasks are already scheduled; await collects their results
result1 = await t1
result2 = await t2
print(result1, result2)
asyncio.run(main())
# Task A: start
# Task B: start
# Task A: end
# Task B: end
# Task A done Task B doneUse create_task when you want a task to start immediately and you plan to collect its result (or cancel it) later. Use gather when you want to launch a fixed group of coroutines and wait for all of them together.
Interleaved output
A useful way to see the event loop in action is to watch how tasks interleave:
import asyncio
async def count_down(name, seconds):
for i in range(seconds, 0, -1):
print(f'{name}: {i}')
await asyncio.sleep(1)
print(f'{name}: done!')
async def main():
await asyncio.gather(
count_down('Task A', 3),
count_down('Task B', 2),
)
asyncio.run(main())
# Task A: 3
# Task B: 2
# Task A: 2
# Task B: 1
# Task A: 1
# Task B: done!
# Task A: done!Both tasks share one thread; the event loop alternates between them at each await asyncio.sleep(1).
Handling exceptions
Exceptions raised inside a coroutine propagate through await just like in synchronous code. Use a regular try/except block:
import asyncio
async def risky_task():
await asyncio.sleep(0.1)
raise ValueError('something went wrong')
async def main():
try:
await risky_task()
except ValueError as e:
print(f'Caught: {e}')
asyncio.run(main())
# Caught: something went wrongWhen using asyncio.gather, if one coroutine raises an exception the others are not cancelled by default, but the exception is re-raised when you await the gather call. Pass return_exceptions=True to collect exceptions as return values instead:
import asyncio
async def good():
return 'ok'
async def bad():
raise RuntimeError('oops')
async def main():
results = await asyncio.gather(good(), bad(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f'Error: {r}')
else:
print(f'Result: {r}')
asyncio.run(main())
# Result: ok
# Error: oopsTimeouts with asyncio.wait_for
asyncio.wait_for(coro, timeout) runs a coroutine and cancels it if it does not finish within the given number of seconds, raising asyncio.TimeoutError:
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return 42
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=1.0)
print(result)
except asyncio.TimeoutError:
print('Timed out — operation cancelled')
asyncio.run(main())
# Timed out — operation cancelledThis is important for production network code where a hung server would otherwise block a task indefinitely.
asyncio.Queue for producer-consumer patterns
asyncio.Queue is a thread-safe, async-aware queue. It is ideal for decoupling producers (code that generates work) from consumers (code that processes it):
import asyncio
async def producer(queue):
for i in range(1, 4):
print(f'Produced item {i}')
await queue.put(i)
await asyncio.sleep(0.1)
await queue.put(None) # sentinel to signal consumers to stop
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f'Consumed item {item}')
async def main():
q = asyncio.Queue()
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
# Produced item 1
# Consumed item 1
# Produced item 2
# Consumed item 2
# Produced item 3
# Consumed item 3For multiple consumers, use queue.task_done() and queue.join() to know when all items have been processed.
asyncio vs threading
Both asyncio and Python's threading module allow work to proceed concurrently, but they do so differently:
| asyncio | threading | |
|---|---|---|
| Concurrency model | Cooperative (coroutines yield at await) | Preemptive (OS switches threads) |
| Best for | Many I/O-bound tasks (network, disk) | I/O-bound tasks that use blocking libraries |
| CPU-bound work | Not helpful — still one thread | Not helpful — GIL limits true parallelism |
| Overhead | Very low (no OS threads) | Higher (each thread uses OS resources) |
| Shared state | Safe within one event loop | Requires locks to avoid data races |
Use asyncio when you control the I/O code and can use async-compatible libraries (e.g., aiohttp, asyncpg). Use threading when you rely on third-party blocking libraries that cannot be made async.
For true CPU parallelism, reach for multiprocessing or concurrent.futures.ProcessPoolExecutor instead.
Common gotchas
Forgetting await: Calling an async function without await returns a coroutine object and does nothing. Python issues a RuntimeWarning: coroutine '...' was never awaited to help catch this.
async def main():
asyncio.sleep(1) # BUG: returns a coroutine, does not sleep
await asyncio.sleep(1) # correctBlocking the event loop: Running slow synchronous code (a tight loop, a blocking network call, time.sleep) inside a coroutine freezes the entire event loop. Wrap blocking calls with asyncio.to_thread (Python 3.9+) to run them in a thread pool without blocking:
import asyncio
import time
def blocking_task():
time.sleep(2) # simulates a slow blocking operation
return 'done'
async def main():
result = await asyncio.to_thread(blocking_task)
print(result)
asyncio.run(main())
# doneUsing asyncio.run inside a running loop: Jupyter notebooks already run an event loop. Use await coro directly in notebook cells, or install nest_asyncio to allow nested loops.
Quick-reference summary
| Pattern | When to use it |
|---|---|
asyncio.run(main()) | Start the event loop from synchronous code |
await coro | Run a coroutine and wait for its result |
asyncio.gather(*coros) | Run multiple coroutines concurrently, collect all results |
asyncio.create_task(coro) | Schedule a coroutine as a background Task |
asyncio.wait_for(coro, timeout=N) | Add a deadline to a coroutine |
asyncio.Queue | Decouple producers from consumers |
asyncio.to_thread(fn) | Run a blocking function without freezing the loop |