FastAPI is a modern, fast web framework for building APIs with Python. When combined with Redis, a powerful in-memory data store, you can create high-performance applications with efficient caching and session management. Let’s dive into how to leverage these technologies together.
First, let’s set up our FastAPI project and install the necessary dependencies:
pip install fastapi redis uvicorn
Now, let’s create a basic FastAPI application and integrate Redis:
from fastapi import FastAPI, Depends
from redis import Redis
app = FastAPI()
def get_redis():
redis = Redis(host='localhost', port=6379, db=0)
try:
yield redis
finally:
redis.close()
@app.get("/")
async def root(redis: Redis = Depends(get_redis)):
return {"message": "Hello, FastAPI with Redis!"}
In this example, we’ve created a dependency get_redis()
that initializes a Redis connection. We can inject this dependency into our route handlers to access Redis.
Let’s implement a simple caching mechanism for a resource-intensive operation:
import time
from fastapi import HTTPException
@app.get("/slow-operation/{item_id}")
async def slow_operation(item_id: int, redis: Redis = Depends(get_redis)):
cache_key = f"slow_operation:{item_id}"
# Check if result is in cache
cached_result = redis.get(cache_key)
if cached_result:
return {"result": cached_result.decode(), "source": "cache"}
# Simulate a slow operation
time.sleep(2)
result = f"Result for item {item_id}"
# Store result in cache for 1 hour
redis.setex(cache_key, 3600, result)
return {"result": result, "source": "computed"}
This endpoint demonstrates how to use Redis for caching. We first check if the result is in the cache. If it is, we return it immediately. If not, we perform the slow operation, store the result in the cache, and then return it.
Now, let’s implement session management using Redis:
from fastapi import Request, Response
from uuid import uuid4
@app.post("/login")
async def login(username: str, password: str, response: Response, redis: Redis = Depends(get_redis)):
# In a real app, you'd verify credentials here
if username == "demo" and password == "password":
session_id = str(uuid4())
redis.setex(f"session:{session_id}", 3600, username)
response.set_cookie(key="session_id", value=session_id)
return {"message": "Login successful"}
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get("/profile")
async def profile(request: Request, redis: Redis = Depends(get_redis)):
session_id = request.cookies.get("session_id")
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
username = redis.get(f"session:{session_id}")
if not username:
raise HTTPException(status_code=401, detail="Session expired")
return {"username": username.decode()}
@app.post("/logout")
async def logout(request: Request, response: Response, redis: Redis = Depends(get_reds)):
session_id = request.cookies.get("session_id")
if session_id:
redis.delete(f"session:{session_id}")
response.delete_cookie("session_id")
return {"message": "Logout successful"}
This code implements basic session management. When a user logs in, we create a session in Redis and set a cookie. The profile endpoint checks for a valid session, and the logout endpoint removes the session from Redis and clears the cookie.
To improve performance further, we can use Redis for rate limiting:
from fastapi import Request
import time
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
redis = request.app.state.redis
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
current = redis.get(key)
if current is not None and int(current) > 10:
return JSONResponse(status_code=429, content={"error": "Too many requests"})
pipe = redis.pipeline()
pipe.incr(key)
pipe.expire(key, 60)
pipe.execute()
response = await call_next(request)
return response
# Don't forget to initialize Redis in your app startup event
@app.on_event("startup")
async def startup_event():
app.state.redis = Redis(host='localhost', port=6379, db=0)
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
This middleware limits each IP to 10 requests per minute. It’s a simple implementation, but you can make it more sophisticated based on your needs.
When working with FastAPI and Redis, it’s crucial to handle connections efficiently. Here’s an improved version of our Redis dependency that uses connection pooling:
from redis import ConnectionPool, Redis
# Create a connection pool
redis_pool = ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
def get_redis():
return Redis(connection_pool=redis_pool)
This approach ensures that we’re not creating a new connection for every request, which can significantly improve performance under high load.
Let’s explore how we can use Redis for more advanced caching scenarios. For example, we might want to cache the results of a database query:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import json
Base = declarative_base()
engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker(bind=engine)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
email = Column(String)
Base.metadata.create_all(bind=engine)
@app.get("/users")
async def get_users(redis: Redis = Depends(get_redis)):
cache_key = "all_users"
cached_users = redis.get(cache_key)
if cached_users:
return json.loads(cached_users)
db = SessionLocal()
users = db.query(User).all()
db.close()
user_list = [{"id": user.id, "name": user.name, "email": user.email} for user in users]
redis.setex(cache_key, 300, json.dumps(user_list)) # Cache for 5 minutes
return user_list
This example demonstrates caching the results of a database query. We first check if the data is in Redis. If it’s not, we query the database, cache the results, and then return them.
Redis can also be used for task queues. While there are dedicated task queue solutions like Celery, for simpler use cases, Redis can be a good fit:
import uuid
@app.post("/tasks")
async def create_task(task_data: dict, redis: Redis = Depends(get_redis)):
task_id = str(uuid.uuid4())
redis.lpush("task_queue", json.dumps({"id": task_id, "data": task_data}))
return {"task_id": task_id}
# In a separate worker process:
def process_tasks(redis_conn):
while True:
_, task = redis_conn.brpop("task_queue")
task_data = json.loads(task)
# Process the task...
print(f"Processing task {task_data['id']}")
This setup allows you to offload time-consuming tasks to a separate process, improving the responsiveness of your API.
When working with FastAPI and Redis, it’s important to consider error handling and resilience. Here’s an example of how you might implement retry logic for Redis operations:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def redis_get_with_retry(redis_conn, key):
return redis_conn.get(key)
@app.get("/resilient-data/{key}")
async def get_resilient_data(key: str, redis: Redis = Depends(get_redis)):
try:
data = redis_get_with_retry(redis, key)
return {"data": data.decode() if data else None}
except Exception as e:
return {"error": str(e)}
This approach uses the tenacity
library to implement a retry mechanism, which can help your application recover from temporary Redis connection issues.
As your application grows, you might find yourself needing to work with multiple Redis databases or even multiple Redis servers. Here’s how you can set up multiple Redis connections:
from functools import lru_cache
@lru_cache()
def get_redis_cache():
return Redis(host='localhost', port=6379, db=0)
@lru_cache()
def get_redis_session():
return Redis(host='localhost', port=6379, db=1)
@app.get("/multi-redis-example")
async def multi_redis_example(
cache_redis: Redis = Depends(get_redis_cache),
session_redis: Redis = Depends(get_redis_session)
):
cache_redis.set("cache_key", "This is cached data")
session_redis.set("session_key", "This is session data")
return {
"cache_data": cache_redis.get("cache_key"),
"session_data": session_redis.get("session_key")
}
This setup allows you to use different Redis databases for different purposes, keeping your data organized and separated.
When working with FastAPI and Redis, it’s also worth considering how to handle background tasks efficiently. FastAPI provides a BackgroundTasks
class that can be used for this purpose:
from fastapi import BackgroundTasks
def write_log(message: str):
with open("app.log", mode="a") as log:
log.write(message)
@app.post("/create-item/")
async def create_item(item: dict, background_tasks: BackgroundTasks, redis: Redis = Depends(get_redis)):
item_id = str(uuid.uuid4())
redis.set(f"item:{item_id}", json.dumps(item))
background_tasks.add_task(write_log, f"Item created with ID: {item_id}")
return {"item_id": item_id}
This approach allows you to perform non-critical operations (like logging) in the background, improving the response time of your API.
As we wrap up this exploration of FastAPI with Redis, it’s clear that this combination offers powerful tools for building high-performance, scalable web applications. From caching and session management to task queues and background processing, Redis complements FastAPI beautifully, allowing you to handle complex scenarios with ease.
Remember, while these examples provide a solid foundation, there’s always room for optimization and refinement based on your specific use case. As with any technology stack, it’s crucial to monitor performance, test thoroughly, and continuously iterate on your design to ensure it meets the needs of your users and your business.
Whether you’re building a small personal project or a large-scale enterprise application, the flexibility and speed offered by FastAPI and Redis make them an excellent choice. So go ahead, experiment with these techniques, and see how they can elevate your next Python web project!