Schedule Task Runner with Celery

Celery is a task queue with batteries included. It’s easy to use so that you can get started without learning the full complexities of the problem it solves. It’s designed around best practices so that your product can scale and integrate with other languages, and it comes with the tools and support you need to run such a system in production.

pip install celery

Choosing a Broker

Celery requires a solution to send and receive messages; usually this comes in the form of a separate service called a message broker. In this example, I use RabbitMQ on docker for the most simple experiment.

docker run -d -p 5672:5672 rabbitmq

The Application

  • Python code to define the task to run

    # ./tasks.py

    from celery import Celery

    app = Celery('tasks', broker='amqp://localhost')

    @app.task
    def add(x, y):
    return x + y

  • Run the worker

    celery -A tasks worker -l info


    -------------- celery@Lap-012 v5.0.2 (singularity)
    --- ***** -----
    -- ******* ---- Linux-5.4.0-54-generic-x86_64-with-debian-bullseye-sid 2020-11-27 10:13:21
    - *** --- * ---
    - ** ---------- [config]
    - ** ---------- .> app: tasks:0x7f5b26a11750
    - ** ---------- .> transport: amqp://guest:**@localhost:5672//
    - ** ---------- .> results: disabled://
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery

    [tasks]
    . tasks.add

    [2020-11-27 10:13:21,496: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
    [2020-11-27 10:13:21,506: INFO/MainProcess] mingle: searching for neighbors
    [2020-11-27 10:13:22,528: INFO/MainProcess] mingle: all alone
    [2020-11-27 10:13:22,568: INFO/MainProcess] celery@Lap-012 ready.

  • The Scheduler, let add few more code to config schedule task

    # ./tasks.py

    from celery import Celery

    app = Celery('tasks', broker='amqp://localhost')

    @app.task
    def add(x, y):
    return x + y

    app.conf.beat_schedule = {
    'add-every-30-seconds': {
    'task': 'tasks.add',
    'schedule': 30.0,
    'args': (16, 16)
    },
    }
    app.conf.timezone = 'UTC'

  • Run the scheduler, open new terminal and run

    celery -A tasks beat -l info


    celery beat v5.0.2 (singularity) is starting.
    __ - ... __ - _
    LocalTime -> 2020-11-27 10:18:19
    Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
    [2020-11-27 10:18:19,508: INFO/MainProcess] beat: Starting...
    [2020-11-27 10:18:49,552: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)

  • You can check whether the worker is running on the terminal log

    [2020-11-27 10:19:19,546: INFO/MainProcess] Received task: tasks.add[b09eb21e-3e9f-459a-8069-d41b068d02c4]  
    [2020-11-27 10:19:19,549: INFO/ForkPoolWorker-5] Task tasks.add[b09eb21e-3e9f-459a-8069-d41b068d02c4] succeeded in 0.00027026102179661393s: 32

Organize all together using docker-compose

  • ./Dockerfile

    FROM python:3.7

    RUN pip install celery

    WORKDIR /app

    COPY . .

  • ./docker-compose.yml

    version: '3.1'

    services:
    rabbitmq:
    image: rabbitmq:3.7
    restart: always

    worker:
    build: .
    restart: always
    environment:
    - MQ_HOST=rabbitmq
    depends_on:
    - rabbitmq
    command: celery -A tasks worker -l info

    sched:
    build: .
    environment:
    - MQ_HOST=rabbitmq
    depends_on:
    - rabbitmq
    - worker
    restart: always
    command: celery -A tasks beat -l info

  • ./tasks.py

    import os
    from celery import Celery

    MQ_HOST = os.environ.get('MQ_HOST', 'localhost')

    app = Celery('tasks', broker=f'amqp://{MQ_HOST}')

    @app.task
    def add(x, y):
    return x + y

    app.conf.beat_schedule = {
    'add-every-30-seconds': {
    'task': 'tasks.add',
    'schedule': 30.0,
    'args': (16, 16)
    },
    }
    app.conf.timezone = 'UTC'