Implementing role-based authorization in FastAPI using JSON Web Tokens (JWT) is a neat way to secure your APIs, ensuring only authorized users can access specific resources. This blends the speed and flexibility of FastAPI with the robust security features of JWTs.
So, why bother with Role-Based Access Control (RBAC) in the first place? Well, RBAC is a game-changer for managing permissions in today’s web apps. Instead of assigning permissions to each user individually, which is a total headache, you can assign roles. Each role comes with a specific set of permissions. This way, you can easily scale and manage who gets to do what in your app without tearing your hair out.
Ready to dive in? Great, let’s set up a new FastAPI project.
First things first. Create a directory for your project. Open your terminal and run:
mkdir myproject
cd myproject
Now, fire up a virtual environment:
python -m venv venv
source venv/bin/activate
Next, you’ll need some dependencies. Run:
pip install fastapi uvicorn pyjwt passlib[bcrypt]
Your project directory should now have a structure like this:
myproject/
├── main.py
├── requirements.txt
└── venv/
Cool, let’s move on and create the FastAPI app in your main.py
file.
First, import the necessary modules:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import List
Set up your FastAPI app and some essential configurations, such as your secret key and the JWT algorithm:
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], default="bcrypt")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
Now, for the meat of it—user authentication and JWT tokens. You’ll need endpoints for user registration and login.
First, a function to create access tokens:
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Next, a function to get the current user based on the token:
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, token_data.username)
if not user:
raise credentials_exception
return user
The login endpoint lets users get their access token:
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Authenticate users with this function:
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not pwd_context.verify(password, user.hashed_password):
return False
return user
Retrieve user details from the fake database, which you’ll have to implement:
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
Time to set up RBAC. Here’s how you define roles and permissions using dependency injection:
First, create a class to handle role checks:
class RoleChecker:
def __init__(self, allowed_roles: List[str]):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_user)):
if user.role not in self.allowed_roles:
raise HTTPException(
status_code=401,
detail="Operation not permitted",
)
return user
Define the roles and the permission sets:
admin_permissions = ["admin"]
user_permissions = ["user"]
admin_role_checker = RoleChecker(allowed_roles=admin_permissions)
user_role_checker = RoleChecker(allowed_roles=user_permissions)
Protecting routes is easy now. Just use the role checkers:
@app.get("/items/")
async def read_items(current_user: User = Depends(admin_role_checker)):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/users/")
async def read_users(current_user: User = Depends(user_role_checker)):
return [{"username": current_user.username}]
Great! Now it’s time to test everything.
Run your FastAPI app using uvicorn:
uvicorn main:app --reload
You can use curl
or Postman to test the login endpoint and get a JWT token:
curl -X 'POST' \
'http://127.0.0.1:8000/token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=admin&password=yourpassword'
Once you have your JWT token, access protected routes like this:
curl -X 'GET' \
'http://127.0.0.1:8000/items/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer your-jwt-token'
And there you have it. Implementing role-based authorization in FastAPI using JWT is a powerful way to secure your APIs. It’s efficient and scalable. By following these steps, you can ensure only authorized users access specific resources, making your application more secure and easier to manage. FastAPI’s performance combined with JWT’s flexibility provides a modern solution to access control in web applications.