Curious How to Guard Your FastAPI with VIP Access?

VIP Passes: Crafting a Secure FastAPI with JWT and Scopes

Curious How to Guard Your FastAPI with VIP Access?

Implementing role-based authorization in FastAPI using JWT tokens and scopes is like giving out special VIP passes; only certain people can get into certain places. It’s a great way to make sure that only the right folks can access specific parts of your API. This approach boosts the security of your application and keeps everything running smoothly and safely.

First things first, let’s get FastAPI and its buddies installed. You’ll need FastAPI, Pydantic, Uvicorn, Passlib, and Python-Jose. These are the packages you’ll install to get everything ready for JWT handling.

pip install fastapi pydantic uvicorn passlib python-jose

With the installation out of the way, it’s time to set up the basic structure of your FastAPI project. Here’s a basic example to get you started:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "your_secret_key_here"
ALGORITHM = "HS256"

pwd_context = CryptContext(schemes=["bcrypt"], default="bcrypt")

Next, it’s all about setting up your user model and authentication mechanisms. This includes creating a User model and functionality to authenticate users and generate JWT tokens.

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None
    role: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None
    scopes: list[str] = []

# Fake user database for demonstration
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
        "role": "admin"
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Chains",
        "email": "[email protected]",
        "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
        "disabled": True,
        "role": "client"
    }
}

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return User(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

For role-based authorization, scopes are the way to go. Think of scopes like different levels of clearance. You can define what permissions are needed to access various endpoints.

Here’s how you can define roles and use a role checker:

class RoleChecker:
    def __init__(self, allowed_roles):
        self.allowed_roles = allowed_roles

    def __call__(self, user: User = Depends(get_current_active_user)):
        if user.role in self.allowed_roles:
            return True
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, 
            detail="You don't have enough permissions"
        )

admin_role_checker = RoleChecker(["admin"])
client_role_checker = RoleChecker(["client"])

@app.get("/admin-only")
async def read_admin_only(current_user: User = Depends(admin_role_checker)):
    return {"message": "Hello, admin!"}

@app.get("/client-only")
async def read_client_only(current_user: User = Depends(client_role_checker)):
    return {"message": "Hello, client!"}

Next up, you need a login endpoint to dish out these JWT tokens based on user credentials.

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=30)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": ["items:read", "items:write"]},
        expires_delta=access_token_expires,
    )
    return {"access_token": access_token, "token_type": "bearer"}

If you want to get into the nitty-gritty of permissions, SecurityScopes will be your awesome partner in crime. You can specify detailed scopes required for each endpoint.

from fastapi.security import SecurityScopes

@app.get("/items/")
async def read_items(current_user: User = Depends(get_current_active_user), scopes: SecurityScopes = SecurityScopes(["items:read"])):
    return {"message": "Hello, items!"}

@app.post("/items/")
async def create_item(current_user: User = Depends(get_current_active_user), scopes: SecurityScopes = SecurityScopes(["items:write"])):
    return {"message": "Item created!"}

To wrap things up, using JWT tokens and scopes for role-based authorization in FastAPI is a solid move. It’s a flexible and robust way to keep your APIs safe and make sure only the right people get the right access.

By setting up roles and carefully specifying scopes, you ensure that every user only gets to the parts of the API that they need. This setup isn’t just about safety—it makes the whole operation simpler to maintain and scale. Adding this to your toolbox means a more secure, organized, and efficient FastAPI application. Plus, keeping our applications tidy and safe makes life easier for everyone involved.