FastAPI Performance Optimization: Building Production APIs
4 mins read

FastAPI Performance Optimization: Building Production APIs

FastAPI has become the leading choice for building high-performance APIs in Python, offering automatic documentation, data validation, and exceptional speed. This guide covers advanced optimization techniques for production FastAPI applications.

Table of Contents

API architecture diagram - Rest API Architecture. REST API architecture refers to the… | by ...
API architecture diagram – Rest API Architecture. REST API architecture refers to the… | by …

Performance Architecture Overview

FastAPI’s performance comes from its foundation on Starlette and Pydantic, combined with async/await support that enables handling thousands of concurrent requests.


from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel, Field
import asyncio
import aioredis
from typing import List, Optional

app = FastAPI(
    title="High-Performance API",
    description="Optimized FastAPI application",
    version="2.0.0"
)

app.add_middleware(GZipMiddleware, minimum_size=1000)

class UserModel(BaseModel):
    id: int
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    is_active: bool = True

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    password: str = Field(..., min_length=8)

# Connection pool for database
class DatabaseManager:
    def __init__(self):
        self.pool = None

    async def connect(self):
        self.pool = aioredis.ConnectionPool.from_url(
            "redis://localhost",
            max_connections=20
        )

    async def get_connection(self):
        return aioredis.Redis(connection_pool=self.pool)

db_manager = DatabaseManager()

@app.on_event("startup")
async def startup():
    await db_manager.connect()

@app.on_event("shutdown")
async def shutdown():
    if db_manager.pool:
        await db_manager.pool.disconnect()

Advanced Request Processing

FastAPI Performance Optimization: Building Production APIs
FastAPI Performance Optimization: Building Production APIs

Efficient request processing maximizes throughput while maintaining data integrity:

FastAPI Performance Optimization: Building Production APIs
FastAPI Performance Optimization: Building Production APIs

from fastapi import BackgroundTasks
import asyncio
from typing import List

@app.post("/users/bulk", response_model=dict)
async def create_users_bulk(
    users: List[UserCreate],
    background_tasks: BackgroundTasks
):
    start_time = time.time()
    batch_size = 50
    processed = 0
    failed = 0

    for i in range(0, len(users), batch_size):
        batch = users[i:i + batch_size]
        tasks = [process_user(user) for user in batch]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in results:
            if isinstance(result, Exception):
                failed += 1
            else:
                processed += 1

    execution_time = time.time() - start_time

    # Schedule background cleanup
    background_tasks.add_task(cleanup_temp_data)

    return {
        "total": len(users),
        "processed": processed,
        "failed": failed,
        "execution_time": execution_time
    }

async def process_user(user_data: UserCreate) -> UserModel:
    # Simulate database operation
    await asyncio.sleep(0.01)
    return UserModel(
        id=hash(user_data.username) % 10000,
        username=user_data.username,
        email=user_data.email,
        is_active=True
    )

async def cleanup_temp_data():
    # Background cleanup task
    await asyncio.sleep(1)
    print("Cleanup completed")

Caching and Performance Optimization

Strategic caching dramatically improves API performance:


import hashlib
import json
from functools import wraps

class CacheManager:
    def __init__(self, redis_connection):
        self.redis = redis_connection
        self.default_ttl = 3600

    def cache_key(self, prefix: str, **kwargs) -> str:
        key_data = json.dumps(kwargs, sort_keys=True)
        key_hash = hashlib.md5(key_data.encode()).hexdigest()
        return f"{prefix}:{key_hash}"

    async def get_cached(self, key: str):
        data = await self.redis.get(key)
        return json.loads(data) if data else None

    async def set_cache(self, key: str, data, ttl: int = None):
        ttl = ttl or self.default_ttl
        await self.redis.setex(key, ttl, json.dumps(data, default=str))

def cache_response(prefix: str, ttl: int = 3600):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            cache_key = f"{prefix}:{hash(str(args) + str(kwargs))}"

            # Try cache first
            redis_conn = await db_manager.get_connection()
            cache_manager = CacheManager(redis_conn)

            cached = await cache_manager.get_cached(cache_key)
            if cached:
                return cached

            # Execute function and cache result
            result = await func(*args, **kwargs)
            await cache_manager.set_cache(cache_key, result, ttl)

            return result
        return wrapper
    return decorator

@app.get("/users/{user_id}", response_model=UserModel)
@cache_response("user", ttl=1800)
async def get_user(user_id: int):
    # Simulate database lookup
    await asyncio.sleep(0.1)
    return UserModel(
        id=user_id,
        username=f"user_{user_id}",
        email=f"user_{user_id}@example.com",
        is_active=True
    )

These optimization techniques enable FastAPI applications to achieve exceptional performance while maintaining clean, maintainable code that scales with business requirements.

Leave a Reply

Your email address will not be published. Required fields are marked *