Overview
Python’s asyncio library provides cooperative concurrency via an event loop. It is the right tool for I/O-bound work: network calls, database queries, and file reads. For CPU-bound work, use multiprocessing or concurrent.futures.ProcessPoolExecutor instead. Read python for the project setup; this page covers the async programming model.
Use async def only for functions that await something
Mark a function async when it contains at least one await. A plain def that wraps an async def does not gain concurrency; the caller must await to actually run the coroutine.
import asyncio
import httpx
async def fetch(url: str) -> bytes:
async with httpx.AsyncClient() as client:
r = await client.get(url)
r.raise_for_status()
return r.contentNever call an async function without await. fetch("https://example.com") creates a coroutine object and does nothing; the linter will warn, but the bug is silent otherwise.
Use TaskGroup for concurrent I/O
asyncio.TaskGroup (Python 3.11+) is the correct way to run multiple coroutines concurrently. If any task raises, the group cancels the remaining tasks and re-raises all exceptions together as an ExceptionGroup.
async def fetch_all(urls: list[str]) -> list[bytes]:
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(client.get(u)) for u in urls]
return [t.result().content for t in tasks]Avoid asyncio.gather(*coros) for new code. gather swallows exceptions by default and its cancellation behavior is less predictable. TaskGroup provides structured concurrency: tasks do not outlive the block that created them.
Handle cancellation explicitly
asyncio.CancelledError is raised at every await point when a task is cancelled. Do not catch it with a bare except Exception; that silences the cancellation signal and causes hangs. If you must clean up on cancellation, use finally.
async def job() -> None:
try:
await do_work()
except asyncio.CancelledError:
await cleanup()
raise # always re-raise
finally:
release_resource()Use asyncio.shield(coro) to protect a coroutine from cancellation when the outer task is cancelled and you need it to finish regardless.
await asyncio.shield(critical_write(data))shield does not prevent the outer task from being cancelled; it only prevents the inner coroutine from receiving the cancel signal.
Never block the event loop
A blocking call in an async function stalls every other coroutine until it returns. The GIL means one thread runs at a time, and time.sleep, requests.get, or any CPU-heavy loop will freeze the event loop.
# Bad: blocks the event loop for the duration of the sleep
async def bad() -> None:
time.sleep(1)
# Good: yields control back to the event loop
async def good() -> None:
await asyncio.sleep(1)For blocking I/O that has no async alternative, use loop.run_in_executor or asyncio.to_thread to run it in a thread pool.
import asyncio
async def read_file(path: str) -> bytes:
return await asyncio.to_thread(open(path, "rb").read)Use async with and async for for resource management
Context managers that hold async resources (database connections, HTTP clients) implement __aenter__ and __aexit__. Use async with to ensure they are released even on exception. Use async for for async generators and streams.
async def stream_lines(url: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as response:
async for line in response.aiter_lines():
process(line)See fastapi for how FastAPI injects dependencies that use async with via its lifespan context.
Separate sync and async code at the boundary
Write sync functions for pure computation. Write async functions for I/O. Pass data between them, not function references. A sync function calling asyncio.run(coro) is fine at the top-level entry point; do not call it from inside a running event loop.
# Top-level script: one asyncio.run call
if __name__ == "__main__":
asyncio.run(main())Frameworks like FastAPI manage the event loop; do not call asyncio.run inside a route handler.
Test async code with pytest-asyncio
Use pytest-asyncio for testing coroutines. Mark tests with @pytest.mark.asyncio or set asyncio_mode = "auto" in pyproject.toml.
import pytest
@pytest.mark.asyncio
async def test_fetch_returns_bytes() -> None:
result = await fetch("https://httpbin.org/bytes/16")
assert len(result) == 16See python-testing for the full pytest fixture and mocking strategy.