Celery is a distributed system for processing messages on a task queue with a focus or real-time processing and support for task scheduling. Celery communicates with a message broker and distributes work. Here is an boilerplate example of Celery.
#project/celery.py
from celery import Celery
from django.conf import settings
app = Celery('appname', broker=settings.BROKER_URL)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)
celery -A appname worker
app
is the app instance of all project. Which means it is only possible to have one Celery running per project. The appname
is the name of the application and the broker
is URL of the message broker where the worker is going to attach.
Celery uses a @task
decorator that gives the ability to hook to the celery.
@app.task
def generate_thumbnails_task(image_path, sizes):
for size in sizes:
make_thumb(image_path, size)
Celery is used mostly for:
- Generating assets after upload
- Notifying a set of users when an event happens.
- Keeping a search index up to date
- Replacing cronjobs (backups, cleanup)
The message broker have the role to send the tasks to the consumers. The consumer is a process that has the role to consume tasks from the queue. The consumer then delegates the tasks to the worker pool to be processed.
Celery was made to work with RabbitMQ, although it can work with other message brokers like Redis. RabbitMQ implements AMQP (Advanced Message Queuing Protocol) that delivers and tracks message acknowledgments. AMQP backend spawns an Erlang process and a queue for each task, which is very expensive. So, in some cases it is useful to keep a cache for previous executed tasks and save these results in key/value store.
#settings.py
CELERY_RESULT_BACKEND = 'redis://:password@host:port/db’
Some tasks can return data like this.
@app.task
def add(a, b):
return a + b
So, to track the state of a task and its return values, hen execution is finished a "result backend" is used.
When using celery, it is necessary to define the number of workers. To not impact performance, it is recommended that the number of workers be n+1 workers (n is the number of cores).
celery worker -A appname -c 5
celery worker -A appname -P eventlet -c 1000
celery worker -A appname -P gevent -c 1000
There are some pitfalls that is necessary to take care when using Celery:
- Parallel Safety
- Poor Task Granularity
- Misguided Optimizations
In the case of Parallel Safety, you need to write tasks to be parallel safe. If you had no idea when tasks run, you need to plan. If you are trying to disallow certain tasks from running in parallel because of some possible deadlocks, you are focusing on the wrong problem. It means that you need to design better your application. Don't pass objects as task parameters use values like JSON or YAML, and be careful to not have parallel transactions in the database because you can return the wrong value.
In this example, it is used report_id
to avoid the PDF being generated if there is already the up-to-date version. When two requests ran in parallel, just one request will be accepted.
#tasks.py
from datetime import datetime
from project.celery import app
from project.utils import create_pdf_from_html
@app.task
def generate_pdf_report(report_id):
report = Report.objects.get(id=report_id)
if report.pdf_last_modified < report.html_last_modified:
pdf_data = create_pdf_from_html(report.html)
now = datetime.now()
Report.objects.filter(id=report_id).update(
pdf_data=pdf_data, pdf_last_modified=now)
When creating a task, it is necessary to have the right granularity of a task. Too big task code is hard to read and maintain and takes many seconds, even minutes to run. Also, it is harder to scale horizontally. When you design tasks around small units, you can execute them in parallel. Don’t parallel everything, because some problems can only execute in sequential.
#tasks.py
from project.celery import app
from forum.utils import notify_user
from forum.models import ForumPost
@app.task
def notify_forum_users(post_id):
post = ForumPost.objets.get(post_id)
users = post.parent_thread.users.all()
for user in users:
notify_user(user)
Granularity Problem
#tasks.py
from project.celery import app
from forum.utils import notify_user
from forum.models import ForumPost
@app.task
def notify_forum_users(post_id):
post = ForumPost.objets.get(id=post_id)
users = post.parent_thread.users.all()
for user in users:
notify_user_task.delay(user.id)
@app.task
def notify_user_task(user_id)
user = User.objects.get(id=user_id)
notify_user(user)
Granularity Solution
When optimizing Celery, don't just turn the number of workers up without measuring. Too much concurrency and resources become constrained. Like most code, network and storage I/O tend to be the bottleneck. Also profile your code if it is slow and look for slow DB queries, storage or network intensive work.
This notes came from bit.ly/1k1fVLz
.