| 3 min read

Building Rate-Limited APIs with FastAPI and Redis

FastAPI Redis rate limiting Python API design backend

Why Rate Limiting Matters More Than You Think

Every API I have ever deployed to production has been abused within 48 hours. Scrapers, bots, misconfigured clients sending infinite retry loops. Without rate limiting, a single bad actor can bring down your entire service and rack up thousands in compute costs.

I have settled on a FastAPI plus Redis stack for rate limiting because it is fast, reliable, and horizontally scalable. Here is exactly how I build it.

Choosing Your Algorithm

There are three main rate limiting algorithms, and each serves a different use case:

  • Fixed window: Simple but allows burst traffic at window boundaries
  • Sliding window: Smoother distribution, slightly more complex
  • Token bucket: Best for APIs that need burst tolerance with sustained limits

For most production APIs, I use the sliding window approach. It provides predictable behavior without the boundary burst problem of fixed windows.

Setting Up Redis

import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException
from datetime import datetime

app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.window_size = 60  # seconds
    
    async def check(self, key: str) -> bool:
        now = datetime.utcnow().timestamp()
        window_start = now - self.window_size
        
        pipe = redis_client.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zadd(key, {str(now): now})
        pipe.zcard(key)
        pipe.expire(key, self.window_size)
        results = await pipe.execute()
        
        request_count = results[2]
        return request_count <= self.rpm

The Sliding Window Implementation

The sliding window uses a Redis sorted set where each request timestamp is both the member and the score. On each request, we remove entries older than our window, add the new request, and count the remaining entries. This gives us an accurate count of requests in the last N seconds.

The beauty of this approach is atomicity. By using a Redis pipeline, all four operations execute as a single unit. No race conditions, even under high concurrency.

FastAPI Middleware Integration

from fastapi import Depends
from starlette.middleware.base import BaseHTTPMiddleware

rate_limiter = RateLimiter(requests_per_minute=60)

class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host
        api_key = request.headers.get("X-API-Key", client_ip)
        
        key = f"rate_limit:{api_key}"
        allowed = await rate_limiter.check(key)
        
        if not allowed:
            remaining = await get_remaining(key)
            return JSONResponse(
                status_code=429,
                content={"error": "Rate limit exceeded"},
                headers={
                    "X-RateLimit-Limit": str(rate_limiter.rpm),
                    "X-RateLimit-Remaining": str(remaining),
                    "Retry-After": "60"
                }
            )
        
        response = await call_next(request)
        return response

app.add_middleware(RateLimitMiddleware)

Tiered Rate Limits by API Key

In production, you almost always need different rate limits for different users. Free tier gets 60 requests per minute, paid tier gets 600, and internal services get unlimited.

TIER_LIMITS = {
    "free": 60,
    "pro": 600,
    "enterprise": 6000,
    "internal": float('inf')
}

async def get_tier_limit(api_key: str) -> int:
    tier = await redis_client.hget(f"api_key:{api_key}", "tier")
    return TIER_LIMITS.get(tier or "free", 60)

Token Bucket for Burst Tolerance

Some endpoints need to allow short bursts while maintaining an overall rate limit. The token bucket algorithm handles this elegantly. Tokens refill at a steady rate, and each request consumes one token. If the bucket is empty, the request is rejected.

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second
    
    async def consume(self, key: str) -> bool:
        now = datetime.utcnow().timestamp()
        bucket_key = f"bucket:{key}"
        
        data = await redis_client.hgetall(bucket_key)
        tokens = float(data.get("tokens", self.capacity))
        last_refill = float(data.get("last_refill", now))
        
        elapsed = now - last_refill
        tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
        
        if tokens < 1:
            return False
        
        await redis_client.hset(bucket_key, mapping={
            "tokens": tokens - 1,
            "last_refill": now
        })
        await redis_client.expire(bucket_key, 300)
        return True

Monitoring and Alerting

Rate limiting without monitoring is flying blind. I track three key metrics:

  • Total 429 responses per minute: If this spikes, either limits are too tight or you are under attack
  • Unique IPs hitting limits: Distinguishes between one abusive client and a broad issue
  • p99 latency of rate limit checks: Redis should respond in under 1ms; if not, check your connection pool

Production Checklist

Before deploying rate limiting to production, make sure you have covered these bases:

  • Redis persistence is configured so limits survive restarts
  • Rate limit headers are included in every response, not just 429s
  • Your load balancer forwards real client IPs correctly
  • Internal health check endpoints are excluded from rate limiting
  • You have a bypass mechanism for emergency debugging

Rate limiting is one of those things that seems simple until you deploy it. Get the fundamentals right with Redis and FastAPI, and you will save yourself from the inevitable 3 AM incident.