
FastAPI Performance Optimization: Building Production APIs
FastAPI has become the leading choice for building high-performance APIs in Python, offering automatic documentation, data validation, and exceptional speed. This guide covers advanced optimization techniques for production FastAPI applications.
Performance Architecture Overview
FastAPI’s performance comes from its foundation on Starlette and Pydantic, combined with async/await support that enables handling thousands of concurrent requests.
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel, Field
import asyncio
import aioredis
from typing import List, Optional
app = FastAPI(
title="High-Performance API",
description="Optimized FastAPI application",
version="2.0.0"
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
class UserModel(BaseModel):
id: int
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
is_active: bool = True
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
password: str = Field(..., min_length=8)
# Connection pool for database
class DatabaseManager:
def __init__(self):
self.pool = None
async def connect(self):
self.pool = aioredis.ConnectionPool.from_url(
"redis://localhost",
max_connections=20
)
async def get_connection(self):
return aioredis.Redis(connection_pool=self.pool)
db_manager = DatabaseManager()
@app.on_event("startup")
async def startup():
await db_manager.connect()
@app.on_event("shutdown")
async def shutdown():
if db_manager.pool:
await db_manager.pool.disconnect()
Advanced Request Processing
Efficient request processing maximizes throughput while maintaining data integrity:
from fastapi import BackgroundTasks
import asyncio
from typing import List
@app.post("/users/bulk", response_model=dict)
async def create_users_bulk(
users: List[UserCreate],
background_tasks: BackgroundTasks
):
start_time = time.time()
batch_size = 50
processed = 0
failed = 0
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
tasks = [process_user(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
failed += 1
else:
processed += 1
execution_time = time.time() - start_time
# Schedule background cleanup
background_tasks.add_task(cleanup_temp_data)
return {
"total": len(users),
"processed": processed,
"failed": failed,
"execution_time": execution_time
}
async def process_user(user_data: UserCreate) -> UserModel:
# Simulate database operation
await asyncio.sleep(0.01)
return UserModel(
id=hash(user_data.username) % 10000,
username=user_data.username,
email=user_data.email,
is_active=True
)
async def cleanup_temp_data():
# Background cleanup task
await asyncio.sleep(1)
print("Cleanup completed")
Caching and Performance Optimization
Strategic caching dramatically improves API performance:
import hashlib
import json
from functools import wraps
class CacheManager:
def __init__(self, redis_connection):
self.redis = redis_connection
self.default_ttl = 3600
def cache_key(self, prefix: str, **kwargs) -> str:
key_data = json.dumps(kwargs, sort_keys=True)
key_hash = hashlib.md5(key_data.encode()).hexdigest()
return f"{prefix}:{key_hash}"
async def get_cached(self, key: str):
data = await self.redis.get(key)
return json.loads(data) if data else None
async def set_cache(self, key: str, data, ttl: int = None):
ttl = ttl or self.default_ttl
await self.redis.setex(key, ttl, json.dumps(data, default=str))
def cache_response(prefix: str, ttl: int = 3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
cache_key = f"{prefix}:{hash(str(args) + str(kwargs))}"
# Try cache first
redis_conn = await db_manager.get_connection()
cache_manager = CacheManager(redis_conn)
cached = await cache_manager.get_cached(cache_key)
if cached:
return cached
# Execute function and cache result
result = await func(*args, **kwargs)
await cache_manager.set_cache(cache_key, result, ttl)
return result
return wrapper
return decorator
@app.get("/users/{user_id}", response_model=UserModel)
@cache_response("user", ttl=1800)
async def get_user(user_id: int):
# Simulate database lookup
await asyncio.sleep(0.1)
return UserModel(
id=user_id,
username=f"user_{user_id}",
email=f"user_{user_id}@example.com",
is_active=True
)
These optimization techniques enable FastAPI applications to achieve exceptional performance while maintaining clean, maintainable code that scales with business requirements.