Celery en Python, Flask y Docker

Celery es una cola de tareas distribuida de código abierto que se utiliza para ejecutar tareas asincrónicas en uno o más trabajadores (workers) de fondo.

En un escenario típico, una aplicación web podría tener operaciones que son lentas o que requieren ser ejecutadas en segundo plano, como el envío de correos electrónicos, procesamiento de archivos grandes, llamadas a APIs externas, etc. Hacer estas operaciones de manera síncrona (es decir, en el mismo proceso que maneja las solicitudes web) no es ideal, ya que puede llevar a tiempos de espera largos para el usuario y una mala experiencia de usuario.

En este ejemplo vamos ha lanzar una aplicación cada 5 minutos para que haga su tarea.

¿Qué necesitamos para usar Celery?

Worker
Un worker (trabajador) es un proceso que realiza las tareas en la cola. Puede haber uno o más trabajadores ejecutándose, y pueden estar en la misma máquina que la aplicación principal o en servidores separado

Beat
Celery Beat es un planificador de tareas. Se utiliza para programar la ejecución de tareas en intervalos regulares, similar a cómo funcionan las tareas cron en sistemas Unix. Por ejemplo, podrías configurar Celery Beat para que ejecute una tarea cada noche a medianoche. Beat también se encarga de poner estas tareas programadas en la cola de tareas para que los trabajadores las ejecuten cuando llegue el momento.

RabbitMQ (o cualquier otro broker)
RabbitMQ es un servidor de mensajes que se utiliza a menudo como broker para Celery. El broker actúa como un medio de comunicación entre la aplicación principal y los trabajadores. Cuando la aplicación principal necesita que se ejecute una tarea, coloca un mensaje en la cola del broker. Los trabajadores escuchan esta cola y toman mensajes de ella para saber qué tareas necesitan ejecutar. Aunque RabbitMQ es un broker comúnmente utilizado, Celery también soporta otros brokers como Redis.

Para resumir:

  • Worker: Ejecuta las tareas.
  • Beat: Programa tareas para ser ejecutadas en el futuro.
  • RabbitMQ (Broker): Actúa como un intermediario que maneja la comunicación entre los trabajadores y la aplicación que envía tareas. Además de tener opcional, la extensión de administración con un panel web.

Todos estos componentes trabajan juntos para permitir la ejecución eficiente y escalable de tareas en segundo plano.

Configuraciones:

Configuración docker-compose:

version: '3.7'
services:
  python_app:
    build: 
      context: ./infrastructure/app_python
      dockerfile: Dockerfile
    ports:
      - ${API_FLASK_PORT}:4450
    volumes:
      - ./app:/app
    networks:
      - shared_network
    tty: true

  rabbitmq:
    image: "rabbitmq:management"
    ports:
      - "5672:5672"  # Puerto para la conexión AMQP
      - "15672:15672"  # Puerto para el panel de control de RabbitMQ (opcional, con la extensión de administración)
    environment:
      RABBITMQ_DEFAULT_USER: "admin"
      RABBITMQ_DEFAULT_PASS: "PASSW"
    networks:
      - shared_network
  
  celery-worker:
    build:
      context: ./infrastructure/app_python
      dockerfile: Dockerfile  # El mismo Dockerfile que usamos para app Flask
    volumes:
      - ./app:/app
    command: celery -A app_api.celery worker --loglevel=info # Asegúrate de reemplazar 'myapp' por el nombre del .py de FLASK!
    depends_on:
      - rabbitmq
    networks:
      - shared_network

  celery-beat:
    build:
      context: ./infrastructure/app_python
      dockerfile: Dockerfile  # El mismo Dockerfile que usamos para app Flask
    volumes:
      - ./app:/app
    command: celery -A app_api.celery beat --loglevel=debug # Asegúrate de reemplazar 'myapp' por el nombre del .py de FLASK!
    depends_on:
      - rabbitmq
    networks:
      - shared_network

networks:
  shared_network:
    external: true

volumes:
  db-admin-data:
    driver: local
    name: db-admin-data

Configuración DockerFile:

FROM python:3.8.10

WORKDIR /app

# Install uWSGI
RUN apt-get update -y

## Instalando Celery
# rabbitmq
RUN pip install celery[librabbitmq]

## Instalando requirements
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Crear un grupo y usuario
RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser

# Usar el usuario creado en los futuros comandos
USER appuser

## Ejecuciones:
#COPY . /app
CMD ["python","app_api.py"]

Configuración flask y celery en script principal app_api.py:

## IMPORTS

from flask import Flask, request, jsonify, abort
from celery import Celery

## Aplicación Flask y Celery y archivo configuracioens

app = Flask(__name__)

app.config.from_pyfile(‘config.py’)

celery = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

celery.conf.update(app.config)

# Guardamos la configuración del beat de Celery, porque no guardaba el diccionario, se puede probar sin esto pero por si ha alguien le pasa que no llega a cargar bien la configuración, añadir esto para que lo actualice y no lo pase por alto.

celery.conf.beat_schedule = app.config.get(‘CELERY_BEAT_SCHEDULE’, {})

def pagina_web_ejemplo():

    return ‘<h1>Hola desde Tomonota</h1>’

# Lo que ejecutará la tarea cada x minutos que tengamos en la configuración del beat!, Acordaros de esta función para luego en el archivo de config.
@celery.task
def mytask():
print(‘Esto lo imprime cada 5 minutos!)

# Por si no existe la web que se solicita
app.register_error_handler(404, pagina_no_encontrada)

if __name__ == ‘__main__’:
app.run(host=’0.0.0.0′, port=4450)#, debug=True)


Configuración del config.py para Flask y Celery:

from datetime import timedelta

### FLASK
# Aquí irán todas las configuraciones de flask, dejo una de ejemplo.
# Descargar archivos de 50Mb
MAX_CONTENT_LENGTH = 50 * 1024 * 1024


### Celery

# Configuración RabittMQ
CELERY_BROKER_USER = 'admin'
CELERY_BROKER_PASSWORD = 'PASSW'
CELERY_BROKER_HOST = 'rabbitmq'
CELERY_BROKER_PORT = '5672'

###Conexión con el worker:
##Para ejecutar con librabbitmq - Se aconseja esta config, pero yo usé la siguiente por un error en la conexión del worker.
#CELERY_BROKER_URL = f'amqp://{CELERY_BROKER_USER}:{CELERY_BROKER_PASSWORD}@rabbitmq:5672//'

###Si existe algún problema a la hora de conexión con el worker probar con pyamqp:
CELERY_BROKER_URL = f'pyamqp://{CELERY_BROKER_USER}:{CELERY_BROKER_PASSWORD}@rabbitmq:5672//'



# Configuración para Celery Beat
# En task: Es el nombre del script  donde se encuentra la función y el nombre de dicha función. Como podeis ver es la función que hemos visto antes.
CELERY_BEAT_SCHEDULE = {
                            'run-every-5-minutes': {
                            'task': 'app_api.mytask', 
                            'schedule': timedelta(minutes=5),
                            },
                        }

Configuración web Admin RabbitMQ:

La extensión de administración del panel web, si lo recordáis hemos abierto el puerto 15672
Si accedemos a la dirección web ip_página:15672 podremos acceder, os paso captura:


Y hasta aquí con mi pequeño aporte para hacer funcionar Celery en python con flask y docker!
Con estas notas podréis lanzaros y probar Celery como gestor de tareas.

Espero que os sirva de ayuda.

Salu2!

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *