Implementing authentication in a FastAPI application is key to locking down your API endpoints and ensuring only folks with the right credentials get access to sensitive info. The most common ways to do this are via JSON Web Tokens (JWT) and OAuth2 password flows. Scroll down for an easy-to-digest, fuss-free guide to setting up both methods in your FastAPI app.
Why We Need Authentication
So, why bother with authentication anyway? It’s like having a bouncer for your app. Only the people who should be there get in. It protects private data, gives users a personalized vibe, and keeps everyone honest. Without solid authentication, any random John or Jane Doe could stumble into your data, causing chaos and potential breaches.
Getting Started with FastAPI
Before jumping into the nitty-gritty, let’s get FastAPI up and running.
First, create a new project and knock out the essentials:
mkdir myproject
cd myproject
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn pyjwt passlib
The project should look something like this:
myproject
├── main.py
├── requirements.txt
└── venv
Kick things off with your main application file, main.py:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from datetime import datetime, timedelta
import jwt
from passlib.context import CryptContext
app = FastAPI()
SECRET_KEY = "yoursecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
JWT Authentication: Your Go-To Option
JWTs are super popular for authentication because they’re compact and secure. Here’s how to get JWT up and running in FastAPI:
Generating and verifying JWT tokens is the first step:
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
def verify_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=403, detail="Token has expired")
    except jwt.JWTClaimsError:
        raise HTTPException(status_code=403, detail="Invalid claims")
    except jwt.JWTError:
        raise HTTPException(status_code=403, detail="Invalid token")
Next, create a custom JWT Bearer class to handle the JWT logic:
class JWTBearer(HTTPBearer):
    def __init__(self, auto_error: bool = True):
        super(JWTBearer, self).__init__(auto_error=auto_error)
    async def __call__(self, request: Request):
        credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
        if credentials:
            if not credentials.scheme == "Bearer":
                raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
            if not self.verify_jwt(credentials.credentials):
                raise HTTPException(status_code=403, detail="Invalid or expired token.")
            return credentials.credentials
        else:
            raise HTTPException(status_code=403, detail="Invalid authorization code.")
    def verify_jwt(self, jwtoken: str) -> bool:
        isTokenValid: bool = False
        try:
            payload = verify_token(jwtoken)
        except:
            payload = None
        if payload:
            isTokenValid = True
        return isTokenValid
Now, secure your routes using the JWT Bearer:
jwt_bearer = JWTBearer()
@app.get("/protected")
async def protected_route(token: str = Depends(jwt_bearer)):
    return {"message": "Hello, authenticated user!"}
OAuth2 Password Flow: Another Strong Option
OAuth2 password flow is also pretty groovy for securely handling user credentials. Here’s how:
Start with defining user models and handling password hashing:
class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None
class UserInDB(User):
    hashed_password: str
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    },
    # Add more users as needed
}
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
    return pwd_context.hash(password)
Set up token and token data models next:
class Token(BaseModel):
    access_token: str
    token_type: str
class TokenData(BaseModel):
    username: str | None = None
    scopes: list[str] = []
Generate access tokens:
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = fake_users_db.get(form_data.username)
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    if not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
Secure routes using OAuth2:
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = verify_token(token)
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.JWTError:
        raise credentials_exception
    user = fake_users_db.get(token_data.username)
    if user is None:
        raise credentials_exception
    return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
Testing Your Authentication
Testing is a must to make sure everything’s legit. Here’s how:
Generate tokens using a tool like curl:
curl -X POST \
  http://127.0.0.1:8000/token \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'grant_type=password&username=johndoe&password=yourpassword'
Access protected routes using the token:
curl -X GET \
  http://127.0.0.1:8000/protected \
  -H 'Authorization: Bearer your_access_token'
Wrapping It Up
Adding JWT or OAuth2 password flow authentication to your FastAPI app is a solid way to keep your API endpoints safe and sound. By following this guide, you’ll ensure that only authorized users can get at your sensitive info. Always test your setup thoroughly before getting it out there in the wild. Your app’s security depends on it!
 
  
  
  
  
  
 