07 de março de 2026
-
RabbitMQ + Celery no Flask (e como testar)
No post anterior falei sobre Celery com Redis. O RabbitMQ faz a mesma coisa, mas é um message broker dedicado. Na prática você só troca o broker='redis://...' por broker='amqp://...' e o Celery funciona igual.
Estrutura do projeto
app/
├── __init__.py
├── celery_app.py
├── tasks.py
└── routes.py
tests/
└── test_tasks.py
Instalação
pip install flask celery[rabbitmq] pytest
Configurando o Celery com Flask
O ponto aqui é garantir que as tasks tenham acesso ao contexto do Flask. Pra isso, cria-se uma subclasse de Task que inicializa o app_context automaticamente.
app/celery_app.py
from celery import Celery
from flask import Flask
def make_celery(app: Flask) -> Celery:
celery = Celery(
app.import_name,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_RESULT_BACKEND"],
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app/__init__.py
from flask import Flask
from app.celery_app import make_celery
celery = None
def create_app(config: dict = None) -> Flask:
global celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL="amqp://guest:guest@localhost:5672//",
CELERY_RESULT_BACKEND="rpc://",
)
if config:
app.config.update(config)
celery = make_celery(app)
from app.routes import bp
app.register_blueprint(bp)
return app
Tasks
app/tasks.py
from app import celery
import time
@celery.task(bind=True, max_retries=3)
def send_email(self, recipient: str, subject: str, body: str) -> dict:
try:
time.sleep(1)
return {"status": "sent", "recipient": recipient}
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
@celery.task
def process_report(data: list) -> dict:
total = sum(data)
average = total / len(data) if data else 0
return {"total": total, "average": average, "count": len(data)}
Rotas
app/routes.py
from flask import Blueprint, request, jsonify
from app.tasks import send_email, process_report
bp = Blueprint("main", __name__)
@bp.route("/email", methods=["POST"])
def queue_email():
payload = request.get_json()
task = send_email.delay(
recipient=payload["recipient"],
subject=payload["subject"],
body=payload["body"],
)
return jsonify({"task_id": task.id}), 202
@bp.route("/report", methods=["POST"])
def queue_report():
payload = request.get_json()
task = process_report.delay(payload["data"])
return jsonify({"task_id": task.id}), 202
Rodando
RabbitMQ via Docker (com a UI de gerenciamento):
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Worker do Celery:
celery -A app.celery worker --loglevel=info
UI do RabbitMQ
A imagem rabbitmq:management sobe uma interface web em http://localhost:15672. O login padrão é guest / guest.
De lá é possível visualizar e controlar tudo em tempo real:
- Queues: ver quantas mensagens estão em cada fila, quantas foram entregues, quantas estão aguardando e quantas foram rejeitadas
- Messages: publicar mensagens manualmente em uma fila ou inspecionar o conteúdo das mensagens sem removê-las (modo "peek")
- Consumers: ver quais workers estão conectados e consumindo de cada fila
- Exchanges: ver como as mensagens estão sendo roteadas
- Admin: criar usuários, definir permissões por virtual host e gerenciar conexões ativas
Uma coisa útil é a aba de "Get messages" dentro de uma fila, que permite ver as mensagens que estão presas sem precisar de código. Isso ajuda bastante quando algo não está sendo processado e você quer entender o que está na fila.
Gerenciamento de tarefas
Quando uma rota dispara uma task com .delay(), ela retorna um task_id. Com esse id é possível consultar o status e o resultado da tarefa.
@bp.route("/task/<task_id>", methods=["GET"])
def get_task_status(task_id: str):
from app import celery
result = celery.AsyncResult(task_id)
return jsonify({"status": result.status, "result": result.result})
Os possíveis status são:
PENDING- a task ainda não foi executada ou não existeSTARTED- o worker pegou a task e está executandoSUCCESS- finalizou com sucessoFAILURE- deu erroRETRY- está aguardando para tentar de novo
Também é possível revogar uma task antes que o worker execute:
from app import celery
celery.control.revoke(task_id, terminate=True)
E inspecionar o que está na fila em tempo real:
inspect = celery.control.inspect()
print(inspect.active()) # tasks sendo executadas agora
print(inspect.reserved()) # tasks aguardando na fila
print(inspect.scheduled()) # tasks agendadas