Schedule Task Runner with Celery
Celery is a task queue with batteries included. It’s easy to use so that you can get started without learning the full complexities of the problem it solves. It’s designed around best practices so that your product can scale and integrate with other languages, and it comes with the tools and support you need to run such a system in production.
pip install celery |
Choosing a Broker
Celery requires a solution to send and receive messages; usually this comes in the form of a separate service called a message broker. In this example, I use RabbitMQ on docker for the most simple experiment.
docker run -d -p 5672:5672 rabbitmq |
The Application
Python code to define the task to run
# ./tasks.py
from celery import Celery
app = Celery('tasks', broker='amqp://localhost')
def add(x, y):
return x + yRun the worker
celery -A tasks worker -l info
-------------- celery@Lap-012 v5.0.2 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.0-54-generic-x86_64-with-debian-bullseye-sid 2020-11-27 10:13:21
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f5b26a11750
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2020-11-27 10:13:21,496: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-11-27 10:13:21,506: INFO/MainProcess] mingle: searching for neighbors
[2020-11-27 10:13:22,528: INFO/MainProcess] mingle: all alone
[2020-11-27 10:13:22,568: INFO/MainProcess] celery@Lap-012 ready.The Scheduler, let add few more code to config schedule task
# ./tasks.py
from celery import Celery
app = Celery('tasks', broker='amqp://localhost')
def add(x, y):
return x + y
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'Run the scheduler, open new terminal and run
celery -A tasks beat -l info
celery beat v5.0.2 (singularity) is starting.
__ - ... __ - _
LocalTime -> 2020-11-27 10:18:19
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2020-11-27 10:18:19,508: INFO/MainProcess] beat: Starting...
[2020-11-27 10:18:49,552: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)You can check whether the worker is running on the terminal log
[2020-11-27 10:19:19,546: INFO/MainProcess] Received task: tasks.add[b09eb21e-3e9f-459a-8069-d41b068d02c4]
[2020-11-27 10:19:19,549: INFO/ForkPoolWorker-5] Task tasks.add[b09eb21e-3e9f-459a-8069-d41b068d02c4] succeeded in 0.00027026102179661393s: 32
Organize all together using docker-compose
./Dockerfile
FROM python:3.7
RUN pip install celery
WORKDIR /app
COPY . ../docker-compose.yml
version: '3.1'
services:
rabbitmq:
image: rabbitmq:3.7
restart: always
worker:
build: .
restart: always
environment:
- MQ_HOST=rabbitmq
depends_on:
- rabbitmq
command: celery -A tasks worker -l info
sched:
build: .
environment:
- MQ_HOST=rabbitmq
depends_on:
- rabbitmq
- worker
restart: always
command: celery -A tasks beat -l info./tasks.py
import os
from celery import Celery
MQ_HOST = os.environ.get('MQ_HOST', 'localhost')
app = Celery('tasks', broker=f'amqp://{MQ_HOST}')
def add(x, y):
return x + y
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'