How Can Role-Based Access Control Transform Your API Security in FastAPI?

Dive Into Secure APIs with FastAPI and Role-Based Access Control

How Can Role-Based Access Control Transform Your API Security in FastAPI?

Securing API Endpoints with Role-Based Access Control in FastAPI: A Practical Guide

If you’re diving into APIs with FastAPI, you know that security is a big deal. One of the most effective ways to manage access to your API endpoints is by using Role-Based Access Control (RBAC). It sounds fancy, but it actually simplifies the task of granting permissions by assigning roles to users instead of handling individual permissions. Let’s break down how you can set this up in a FastAPI application.

What’s Role-Based Access Control (RBAC)?

Think of a role as a job title within an organization. Every job title has specific duties (permissions) associated with it. When someone joins the organization, you just assign them a job title, and they automatically get all the permissions tied to that title. That’s exactly how RBAC works. You assign roles to users, and each role has a set of permissions. This makes managing who can do what a whole lot easier.

Setting Up FastAPI

Before we get into the nitty-gritty of RBAC, we need to set up our FastAPI application. FastAPI is a modern framework for building APIs with Python 3.8+, known for its speed and efficiency. Here’s a quick setup to get you started:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel
from passlib.context import CryptContext
import jwt
import bcrypt
from datetime import datetime, timedelta

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

This snippet gets the ball rolling by importing necessary libraries and setting up basic configurations.

User Model and Authentication

Now, let’s create a user model that includes roles and permissions. We’ll use Pydantic models for this:

class UserBase(BaseModel):
    username: str
    password: str

class LoginData(UserBase):
    pass

class PyUser(UserBase):
    id: int
    permissions: list[str] = []
    role: str

fake_users = [
    {'id': 1, 'username': 'admin', 'password': bcrypt.hashpw('adminpassword'.encode(), bcrypt.gensalt()).decode(), 'permissions': ['items:read', 'items:write', 'users:read', 'users:write'], 'role': 'admin'},
    {'id': 2, 'username': 'client', 'password': bcrypt.hashpw('clientpassword'.encode(), bcrypt.gensalt()).decode(), 'permissions': ['items:read'], 'role': 'client'}
]

Here, we’ve set up a simple user model and a fake user database for demonstration. Each user has a role and a list of permissions.

Authentication and Token Generation

Next, we need to handle user authentication and generate tokens:

def authenticate_user(username: str, password: str) -> PyUser:
    for obj in fake_users:
        if obj['username'] == username and bcrypt.checkpw(password.encode(), obj['password'].encode()):
            return PyUser(**obj)
    raise HTTPException(status_code=401, detail='Invalid credentials')

def get_current_user(token: str = Depends(oauth2_scheme)) -> PyUser:
    try:
        payload = jwt.decode(token, 'secret', algorithms=['HS256'])
        username = payload['sub']
        for obj in fake_users:
            if obj['username'] == username:
                return PyUser(**obj)
    except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
        raise HTTPException(status_code=401, detail='Invalid token')

We’ve defined functions to authenticate our users and to fetch the current user based on their token, ensuring they have valid credentials.

@app.post("/token")
def login_for_access_token(data: LoginData):
    user = authenticate_user(data.username, data.password)
    access_token_expires = timedelta(minutes=30)
    access_token = jwt.encode({
        'sub': user.username,
        'role': user.role,
        'exp': datetime.utcnow() + access_token_expires
    }, 'secret', algorithm='HS256')
    return {'access_token': access_token, 'token_type': 'bearer'}

We’ve added an endpoint to generate a token for authenticated users, which includes their username and role.

Enforcing Role-Based Access Control

To enforce RBAC, we need a mechanism to check user roles before allowing access to certain endpoints. FastAPI’s dependency injection system makes this straightforward:

class RoleChecker:
    def __init__(self, allowed_roles: list[str]):
        self.allowed_roles = allowed_roles

    def __call__(self, user: PyUser = Depends(get_current_user)):
        if user.role not in self.allowed_roles:
            raise HTTPException(status_code=403, detail='Operation not permitted')
        return user

@app.get("/admin/data")
def get_admin_data(_: PyUser = Depends(RoleChecker(allowed_roles=["admin"]))):
    return {"data": "This is admin data"}

@app.get("/client/data")
def get_client_data(_: PyUser = Depends(RoleChecker(allowed_roles=["client"]))):
    return {"data": "This is client data"}

Here, we define a RoleChecker class that takes a list of allowed roles. If the user’s role isn’t in the list, access is denied.

Fine-Grained Access Control with OAuth2 Scopes

For situations where roles aren’t flexible enough, you can use OAuth2 scopes for more fine-grained control:

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", scopes={'items': 'access items data'})

@app.get("/items", dependencies=[Depends(get_current_user)])
def get_items(token: str = Depends(oauth2_scheme)):
    return {"items": "This is items data"}

Scopes work by assigning specific permissions to a user, allowing you to fine-tune access to endpoints.

Wrap-Up

Implementing robust security in your FastAPI applications doesn’t have to be a daunting task. With RBAC, you can ensure that only authorized users have access to your API endpoints. From setting up the user model and authentication system to checking roles and using OAuth2 scopes for fine-grained control, we’ve covered all the necessary steps to get you up and running.

Here’s a complete example to bring everything together:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel
from passlib.context import CryptContext
import jwt
import bcrypt
from datetime import datetime, timedelta

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class UserBase(BaseModel):
    username: str
    password: str

class LoginData(UserBase):
    pass

class PyUser(UserBase):
    id: int
    permissions: list[str] = []
    role: str

fake_users = [
    {'id': 1, 'username': 'admin', 'password': bcrypt.hashpw('adminpassword'.encode(), bcrypt.gensalt()).decode(), 'permissions': ['items:read', 'items:write', 'users:read', 'users:write'], 'role': 'admin'},
    {'id': 2, 'username': 'client', 'password': bcrypt.hashpw('clientpassword'.encode(), bcrypt.gensalt()).decode(), 'permissions': ['items:read'], 'role': 'client'}
]

def authenticate_user(username: str, password: str) -> PyUser:
    for obj in fake_users:
        if obj['username'] == username and bcrypt.checkpw(password.encode(), obj['password'].encode()):
            return PyUser(**obj)
    raise HTTPException(status_code=401, detail='Invalid credentials')

def get_current_user(token: str = Depends(oauth2_scheme)) -> PyUser:
    try:
        payload = jwt.decode(token, 'secret', algorithms=['HS256'])
        username = payload['sub']
        for obj in fake_users:
            if obj['username'] == username:
                return PyUser(**obj)
    except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
        raise HTTPException(status_code=401, detail='Invalid token')

@app.post("/token")
def login_for_access_token(data: LoginData):
    user = authenticate_user(data.username, data.password)
    access_token_expires = timedelta(minutes=30)
    access_token = jwt.encode({
        'sub': user.username,
        'role': user.role,
        'exp': datetime.utcnow() + access_token_expires
    }, 'secret', algorithm='HS256')
    return {'access_token': access_token, 'token_type': 'bearer'}

class RoleChecker:
    def __init__(self, allowed_roles: list[str]):
        self.allowed_roles = allowed_roles

    def __call__(self, user: PyUser = Depends(get_current_user)):
        if user.role not in self.allowed_roles:
            raise HTTPException(status_code=403, detail='Operation not permitted')
        return user

@app.get("/admin/data")
def get_admin_data(_: PyUser = Depends(RoleChecker(allowed_roles=["admin"]))):
    return {"data": "This is admin data"}

@app.get("/client/data")
def get_client_data(_: PyUser = Depends(RoleChecker(allowed_roles=["client"]))):
    return {"data": "This is client data"}

This example sets up a FastAPI application with role-based access control, ensuring that only users with the appropriate roles can access specific endpoints. Happy coding!