Overview

FastAPI runs on a single-threaded async event loop. Every async def route shares that loop. One blocking call in one route stalls every other in-flight request until it returns. Keeping the full request path async is not optional; it is the performance contract of the framework. This page translates the general asyncio rules from python-async into FastAPI-specific patterns: which libraries to use, when to offload, and how to configure workers.

Use async def for all routes that touch I/O

A plain def route runs in a thread pool by FastAPI’s design, which is safe but adds overhead and bypasses async context. Declare every route async def when it touches a database, an HTTP client, or a cache.

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
 
app = FastAPI()
 
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)) -> UserRead:
    return await user_repo.get(db, user_id)

The one valid exception: a route that does pure computation with no I/O can be def. FastAPI will run it in a thread and the event loop stays unblocked. Do not use def as a workaround for synchronous libraries inside what is conceptually an async handler.

Use async database drivers exclusively

The three well-maintained async options for postgres are asyncpg (fastest, lower-level), psycopg[async] (psycopg3, familiar API), and async SQLAlchemy built on either. Do not use psycopg2, records, or sync SQLAlchemy in an async route.

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
 
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10,
    max_overflow=20,
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)

Set expire_on_commit=False to avoid lazy-loading ORM attributes after a commit, which triggers implicit synchronous database access. See fastapi-lifespan for initializing the engine at startup and fastapi-dependencies for injecting the session per request.

Offload blocking sync libraries to a thread pool

When a library has no async API, run it in a thread pool with asyncio.to_thread. This keeps the event loop free while the blocking call runs in a worker thread.

import asyncio
from PIL import Image
import io
 
async def resize_image(data: bytes, width: int, height: int) -> bytes:
    def _resize() -> bytes:
        img = Image.open(io.BytesIO(data))
        img = img.resize((width, height))
        buf = io.BytesIO()
        img.save(buf, format="WEBP")
        return buf.getvalue()
 
    return await asyncio.to_thread(_resize)

asyncio.to_thread (Python 3.9+) is the preferred spelling. loop.run_in_executor(None, func) is the equivalent for older code. FastAPI also re-exports run_in_threadpool from Starlette; all three submit to the default ThreadPoolExecutor. Never call time.sleep, requests.get, or synchronous file reads directly inside an async def route.

Never call time.sleep or requests inside async routes

time.sleep(n) inside an async def blocks the entire event loop for n seconds. All concurrent requests queue behind it. Use await asyncio.sleep(n) for delays and httpx.AsyncClient for outbound HTTP.

import httpx
 
async def fetch_exchange_rate(currency: str) -> float:
    async with httpx.AsyncClient() as client:
        r = await client.get(f"https://api.rates.example.com/{currency}")
        r.raise_for_status()
        return r.json()["rate"]

httpx.AsyncClient mirrors the requests API. A single AsyncClient instance can be shared across requests; see fastapi-lifespan for creating it at startup and fastapi-dependencies for injecting it. See python-async for the general rule on blocking calls.

Configure uvicorn workers to match CPU cores

A single uvicorn process uses one CPU core. For CPU-bound work and to survive a worker crash, run multiple workers.

# production: one worker per core; gunicorn manages restarts
gunicorn app.main:app -k uvicorn.workers.UvicornWorker -w 4 --bind 0.0.0.0:8000
 
# development: single worker with hot-reload
uvicorn app.main:app --reload --port 8000

For pure I/O-bound services, two to four workers per core is a common starting point. Each worker has its own event loop and its own connection pool, so size the postgres pool per worker: if pool_size=10 and you run four workers, the database sees up to 40 connections. Use PgBouncer to multiplex when the total connection count matters.

Limit concurrency with semaphores for downstream services

Async code can generate far more simultaneous requests to a downstream service than it could with thread-based code. Use a semaphore to cap in-flight calls.

import asyncio
 
_sem = asyncio.Semaphore(50)
 
async def call_downstream(url: str) -> dict:
    async with _sem:
        async with httpx.AsyncClient() as client:
            r = await client.get(url, timeout=10.0)
            r.raise_for_status()
            return r.json()

Declare the semaphore at module level or attach it to app.state in lifespan. A semaphore per downstream service lets you tune the limit independently. See python-performance for profiling async bottlenecks.