python

Supercharge Your FastAPI: Async Tasks Made Easy with Celery Integration

FastAPI and Celery integration enables asynchronous task processing. Celery offloads time-consuming operations, improving API responsiveness. Ideal for complex tasks like image processing without blocking API responses.

Supercharge Your FastAPI: Async Tasks Made Easy with Celery Integration

FastAPI is a modern, high-performance web framework for building APIs with Python. While it’s great for handling synchronous requests, sometimes you need to offload time-consuming tasks to run in the background. That’s where Celery comes in handy. Celery is a distributed task queue that allows you to run tasks asynchronously, perfect for handling complex operations without blocking your API responses.

Let’s dive into how we can integrate Celery with FastAPI to supercharge our application’s performance. We’ll start by setting up our project structure and installing the necessary dependencies.

First, create a new directory for your project and navigate into it:

mkdir fastapi-celery-project
cd fastapi-celery-project

Now, let’s set up a virtual environment and install the required packages:

python -m venv venv
source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
pip install fastapi uvicorn celery redis

We’re using Redis as our message broker for Celery, but you could also use RabbitMQ if you prefer.

Let’s create our FastAPI application. Create a file named main.py and add the following code:

from fastapi import FastAPI
from celery import Celery

app = FastAPI()

celery = Celery("tasks", broker="redis://localhost:6379")

@celery.task
def add_numbers(x, y):
    return x + y

@app.get("/add/{x}/{y}")
async def add(x: int, y: int):
    task = add_numbers.delay(x, y)
    return {"task_id": task.id}

@app.get("/result/{task_id}")
async def get_result(task_id: str):
    task = add_numbers.AsyncResult(task_id)
    if task.ready():
        return {"result": task.result}
    return {"status": "Processing"}

In this example, we’ve created a simple FastAPI application with two endpoints. The /add/{x}/{y} endpoint triggers a Celery task to add two numbers, while the /result/{task_id} endpoint allows us to check the status of a task and retrieve its result.

Now, let’s break down what’s happening here:

  1. We import FastAPI and Celery.
  2. We create our FastAPI application instance.
  3. We set up our Celery instance, specifying Redis as the message broker.
  4. We define a Celery task called add_numbers that simply adds two numbers.
  5. We create two FastAPI routes: one to trigger the task and another to check its status.

The add endpoint creates a new Celery task and returns the task ID. The get_result endpoint checks if the task is complete and returns the result if it’s ready, or a “Processing” status if it’s still running.

To run this application, we need to start three components: the FastAPI server, the Celery worker, and Redis.

First, make sure Redis is running on your system. If you’re using Docker, you can start Redis with:

docker run -d -p 6379:6379 redis

Now, let’s start our FastAPI server:

uvicorn main:app --reload

In a separate terminal, start the Celery worker:

celery -A main.celery worker --loglevel=info

With everything up and running, you can now test your application. Open your browser and go to http://localhost:8000/add/5/3. You should see a response with a task ID. Copy this ID and use it to check the result at http://localhost:8000/result/{task_id}.

This setup is great for simple tasks, but in real-world scenarios, you’ll often need to handle more complex operations. Let’s expand our example to include a more realistic use case: image processing.

First, let’s install Pillow, a Python imaging library:

pip install pillow

Now, update your main.py file:

import io
import time
from fastapi import FastAPI, File, UploadFile
from celery import Celery
from PIL import Image

app = FastAPI()
celery = Celery("tasks", broker="redis://localhost:6379")

@celery.task
def process_image(image_bytes):
    # Simulate a time-consuming task
    time.sleep(5)
    
    image = Image.open(io.BytesIO(image_bytes))
    # Perform some image processing (e.g., resize)
    processed_image = image.resize((100, 100))
    
    # Save the processed image to a byte stream
    img_byte_arr = io.BytesIO()
    processed_image.save(img_byte_arr, format='PNG')
    return img_byte_arr.getvalue()

@app.post("/process-image/")
async def upload_image(file: UploadFile = File(...)):
    contents = await file.read()
    task = process_image.delay(contents)
    return {"task_id": task.id}

@app.get("/image-result/{task_id}")
async def get_image_result(task_id: str):
    task = process_image.AsyncResult(task_id)
    if task.ready():
        return {"status": "Complete", "result": "Image processed successfully"}
    return {"status": "Processing"}

In this updated version, we’ve added an image processing task. The /process-image/ endpoint accepts an uploaded image file, reads its contents, and passes them to a Celery task. The task simulates a time-consuming operation by sleeping for 5 seconds, then resizes the image to 100x100 pixels.

This setup allows your API to quickly respond to the upload request while the actual image processing happens in the background. The client can then use the /image-result/{task_id} endpoint to check on the status of their task.

Now, let’s talk about some best practices and advanced features you might want to consider when implementing Celery with FastAPI:

  1. Error Handling: In production, you’ll want to implement proper error handling for your Celery tasks. You can use try/except blocks within your tasks and return meaningful error messages.

  2. Task Prioritization: Celery allows you to set priorities for your tasks. You can define multiple queues and assign different priorities to them.

  3. Scheduled Tasks: Celery can also handle scheduled tasks. You can use Celery Beat to run tasks periodically, like daily reports or data cleanup jobs.

  4. Task Chaining: For complex workflows, Celery supports task chaining. You can create a sequence of tasks where the output of one task becomes the input for the next.

  5. Task Monitoring: In a production environment, you’ll want to monitor your Celery tasks. Tools like Flower provide a real-time web-based monitor for Celery.

  6. Result Backend: We’ve been using Celery’s default in-memory result backend. For production use, you might want to configure a persistent result backend like Redis or a database.

  7. Celery Configuration: As your application grows, you might want to separate your Celery configuration into its own file for better organization.

Let’s implement some of these advanced features. First, let’s create a celery_config.py file:

from celery import Celery

celery_app = Celery("tasks",
                    broker="redis://localhost:6379",
                    backend="redis://localhost:6379")

celery_app.conf.task_routes = {
    "tasks.high_priority": {"queue": "high_priority"},
    "tasks.low_priority": {"queue": "low_priority"},
}

celery_app.conf.beat_schedule = {
    "run-every-hour": {
        "task": "tasks.scheduled_task",
        "schedule": 3600.0,
    },
}

Now, let’s update our main.py to use this configuration and implement some advanced features:

import io
import time
from fastapi import FastAPI, File, UploadFile, HTTPException
from celery import chain
from PIL import Image
from celery_config import celery_app

app = FastAPI()

@celery_app.task(name="tasks.high_priority")
def process_image(image_bytes):
    try:
        # Simulate a time-consuming task
        time.sleep(5)
        
        image = Image.open(io.BytesIO(image_bytes))
        # Perform some image processing (e.g., resize)
        processed_image = image.resize((100, 100))
        
        # Save the processed image to a byte stream
        img_byte_arr = io.BytesIO()
        processed_image.save(img_byte_arr, format='PNG')
        return img_byte_arr.getvalue()
    except Exception as e:
        return f"Error processing image: {str(e)}"

@celery_app.task(name="tasks.low_priority")
def compress_image(image_bytes):
    try:
        image = Image.open(io.BytesIO(image_bytes))
        img_byte_arr = io.BytesIO()
        image.save(img_byte_arr, format='JPEG', quality=50)
        return img_byte_arr.getvalue()
    except Exception as e:
        return f"Error compressing image: {str(e)}"

@celery_app.task(name="tasks.scheduled_task")
def cleanup_old_tasks():
    # Implement cleanup logic here
    print("Cleaning up old tasks...")

@app.post("/process-image/")
async def upload_image(file: UploadFile = File(...)):
    contents = await file.read()
    # Chain tasks: process_image -> compress_image
    task = chain(process_image.s(contents), compress_image.s())()
    return {"task_id": task.id}

@app.get("/image-result/{task_id}")
async def get_image_result(task_id: str):
    task = celery_app.AsyncResult(task_id)
    if task.ready():
        if isinstance(task.result, Exception):
            raise HTTPException(status_code=500, detail=str(task.result))
        return {"status": "Complete", "result": "Image processed and compressed successfully"}
    return {"status": "Processing"}

In this updated version, we’ve implemented several advanced features:

  1. We’re using a separate configuration file for Celery.
  2. We’ve defined task routes to prioritize tasks.
  3. We’ve set up a scheduled task to run every hour (cleanup_old_tasks).
  4. We’re using task chaining to process and then compress the image.
  5. We’ve improved error handling in our tasks and API endpoints.

This setup provides a robust foundation for building scalable applications with FastAPI and Celery. You can process time-consuming tasks in the background, handle high loads by distributing tasks across multiple workers, and create complex workflows using task chaining.

Remember, when deploying your application to production, you’ll want to consider using a process manager like Supervisor to keep your FastAPI server and Celery workers running. You might also want to use a reverse proxy like Nginx to handle incoming HTTP requests.

Integrating Celery with FastAPI opens up a world of possibilities for building high-performance, scalable web applications. Whether you’re processing images, generating reports, or handling any other time-consuming tasks, this combination allows you to keep your API responsive while offloading heavy lifting to background workers.

As you continue to develop your application, keep an eye on task performance and adjust your configuration as needed. You might find that you need to fine-tune your Celery settings, add more workers, or optimize your tasks for better performance. Always monitor your system in production to ensure it’s handling the load efficiently.

Happy coding, and may your FastAPI and Celery powered applications be swift and scalable!

Keywords: FastAPI, Celery, asynchronous tasks, background processing, API performance, Python web framework, task queue, Redis, distributed computing, scalable applications



Similar Posts
Blog Image
How Can You Make Python Run Faster Without Losing Your Sanity?

How to Outwit Python's Thread Bottleneck for Faster, Smarter Code Execution

Blog Image
How Can FastAPI Make Your Serverless Adventure a Breeze?

Mastering FastAPI: Creating Seamless Serverless Functions Across AWS, Azure, and Google Cloud

Blog Image
5 Essential Python Libraries for Mastering Web Scraping: A Developer's Guide

Discover the top 5 Python libraries for web scraping. Learn how to extract data efficiently using Requests, BeautifulSoup, Selenium, Scrapy, and lxml. Boost your web scraping skills today!

Blog Image
Mastering Python's Asyncio: Unleash Lightning-Fast Concurrency in Your Code

Asyncio in Python manages concurrent tasks elegantly, using coroutines with async/await keywords. It excels in I/O-bound operations, enabling efficient handling of multiple tasks simultaneously, like in web scraping or server applications.

Blog Image
Ready to Simplify Your Life by Building a Task Manager in Flask?

Crafting Your Own Flask-Powered Task Manager: A Journey Through Code and Creativity

Blog Image
How Can Custom Validators in Pydantic Supercharge Your FastAPI?

Crafting Reliable FastAPI APIs with Pydantic's Powerful Validation