Alright, so you want to dive into the nitty-gritty of implementing JWT (JSON Web Token) authentication in FastAPI, huh? Buckle up, because this journey is going to make your API routes solidly protected. Let’s get right into it and break this down step by step. By the end, you’ll have a smooth, secure authentication setup that’s ready for prime time.
First things first, you need to make sure your FastAPI environment is set up properly. This means getting everything in place—from setting up a virtual environment to installing the essential packages your project will depend on. Here’s a quick primer on how to get that done.
Kick things off by creating a virtual environment. This keeps your project dependencies all neat and tidy, and here’s the bash magic to do it:
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn pyjwt passlib
Imagine your project structure looking somewhat like this:
myproject
├── main.py
├── requirements.txt
└── venv
Sound good? Sweet. Now, let’s get into JWT authentication—why it’s dope and how it works. JWTs are the go-to for API authentication because they’re compact, URL-safe, and verify easily. Think of them as tiny, secure certificates that say, “Yup, this user is legit!” When a user logs in, the server gives them a JWT. From then on, this token hops along with their subsequent requests in the Authorization
header, making your server know who’s who without fuss.
Okay, before jumping into coding, make sure you install the necessary packages. Your requirements.txt
file should have these entries:
fastapi~=0.109.0
uvicorn~=0.26.0
pyjwt~=2.8.0
passlib~=1.7.4
Cool? Let’s make it rain code. Start by importing the needed modules in your main.py
file:
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext
from datetime import datetime, timedelta
import jwt
from jwt.exceptions import InvalidTokenError
With the imports out of the way, it’s time to define your user models and set up password hashing using passlib
. Password hashing is like putting your passwords in a vault. Here’s how it looks:
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
Next, we need to generate and verify JWT tokens. Think of the secret key here as the master password to your vault. Generate this key using tools like OpenSSL:
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except InvalidTokenError:
return None
Awesome. Now, set up OAuth2 with a token endpoint. This is where users will send their login credentials to get a shiny new JWT token:
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = fake_users_db.get(form_data.username)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Protect your routes using dependency injection. This ensures that only requests with valid tokens can access these routes:
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = verify_token(token)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = fake_users_db.get(token_data.username)
if user is None:
raise credentials_exception
return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
For a fancier validation setup, create a custom JWTBearer
class. This might be an extra step but if you want some extra validation pizzazz, it’s worth it:
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = jwt.decode(jwtoken, SECRET_KEY, algorithms=[ALGORITHM])
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
jwt_bearer = JWTBearer()
@app.get("/protected")
async def protected_route(token: str = Depends(jwt_bearer)):
return {"message": "Hello, authenticated user!"}
Alright, time to test your JWT authentication setup. You can use tools like Postman or FastAPI’s built-in Swagger UI. Here’s a quick way to do it:
- Login and Get Token: Send a POST request to the
/token
endpoint with your username and password. You’ll get an access token in return. - Use the Token: Include this access token in the
Authorization
header of your subsequent requests to protected routes.
In conclusion, implementing JWT-based authentication in FastAPI is a straightforward, foolproof way to level up your API security. By following these steps, you ensure that only authenticated users can access sensitive data and functionalities. This approach not only secures your data but also enhances the overall user experience by reducing unnecessary requests and ensuring secure communication.
Keep in mind that security is not a one-time fix. Regularly updating dependencies and reviewing your authentication mechanisms is crucial to maintaining a robust and secure environment.