Pedro Fonseca
Omnes enim Christus, nihil sine Maria

07 de março de 2026

-

RabbitMQ + Celery no Flask (e como testar)

No post anterior falei sobre Celery com Redis. O RabbitMQ faz a mesma coisa, mas é um message broker dedicado. Na prática você só troca o broker='redis://...' por broker='amqp://...' e o Celery funciona igual.

Estrutura do projeto

app/
├── __init__.py
├── celery_app.py
├── tasks.py
└── routes.py
tests/
└── test_tasks.py

Instalação

pip install flask celery[rabbitmq] pytest

Configurando o Celery com Flask

O ponto aqui é garantir que as tasks tenham acesso ao contexto do Flask. Pra isso, cria-se uma subclasse de Task que inicializa o app_context automaticamente.

from celery import Celery
from flask import Flask


def make_celery(app: Flask) -> Celery:
    celery = Celery(
        app.import_name,
        broker=app.config["CELERY_BROKER_URL"],
        backend=app.config["CELERY_RESULT_BACKEND"],
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery
from flask import Flask
from app.celery_app import make_celery

celery = None


def create_app(config: dict = None) -> Flask:
    global celery

    app = Flask(__name__)
    app.config.update(
        CELERY_BROKER_URL="amqp://guest:guest@localhost:5672//",
        CELERY_RESULT_BACKEND="rpc://",
    )

    if config:
        app.config.update(config)

    celery = make_celery(app)

    from app.routes import bp
    app.register_blueprint(bp)

    return app

Tasks

from app import celery
import time


@celery.task(bind=True, max_retries=3)
def send_email(self, recipient: str, subject: str, body: str) -> dict:
    try:
        time.sleep(1)
        return {"status": "sent", "recipient": recipient}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)


@celery.task
def process_report(data: list) -> dict:
    total = sum(data)
    average = total / len(data) if data else 0
    return {"total": total, "average": average, "count": len(data)}

Rotas

from flask import Blueprint, request, jsonify
from app.tasks import send_email, process_report

bp = Blueprint("main", __name__)


@bp.route("/email", methods=["POST"])
def queue_email():
    payload = request.get_json()
    task = send_email.delay(
        recipient=payload["recipient"],
        subject=payload["subject"],
        body=payload["body"],
    )
    return jsonify({"task_id": task.id}), 202


@bp.route("/report", methods=["POST"])
def queue_report():
    payload = request.get_json()
    task = process_report.delay(payload["data"])
    return jsonify({"task_id": task.id}), 202

Rodando

RabbitMQ via Docker (com a UI de gerenciamento):

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Worker do Celery:

celery -A app.celery worker --loglevel=info

UI do RabbitMQ

A imagem rabbitmq:management sobe uma interface web em http://localhost:15672. O login padrão é guest / guest.

De lá é possível visualizar e controlar tudo em tempo real:

Uma coisa útil é a aba de "Get messages" dentro de uma fila, que permite ver as mensagens que estão presas sem precisar de código. Isso ajuda bastante quando algo não está sendo processado e você quer entender o que está na fila.

Gerenciamento de tarefas

Quando uma rota dispara uma task com .delay(), ela retorna um task_id. Com esse id é possível consultar o status e o resultado da tarefa.

@bp.route("/task/<task_id>", methods=["GET"])
def get_task_status(task_id: str):
    from app import celery
    result = celery.AsyncResult(task_id)
    return jsonify({"status": result.status, "result": result.result})

Os possíveis status são:

Também é possível revogar uma task antes que o worker execute:

from app import celery

celery.control.revoke(task_id, terminate=True)

E inspecionar o que está na fila em tempo real:

inspect = celery.control.inspect()

print(inspect.active())    # tasks sendo executadas agora
print(inspect.reserved())  # tasks aguardando na fila
print(inspect.scheduled()) # tasks agendadas