In this tutorial I will show you when to start using multiple queues in Celery and how to do this.

Our hypotetical app will have 3 categories of Celery tasks: - one that are basic tasks that power the interface of the app - long running tasks that process uploaded files - tasks that involve interaction with a 3rd party API and are large in numbers.

As an example it can be a tool that allows uploading of contact information and then checks if emails have MX records, to see if it makes sense to send emails on those addresses.

This way, first type of tasks must work fast in order for user to have a comfortable experience.

Second type of tasks can take a long time to complete and shouldn't block other types of tasks.

Third type of tasks has another characteristic: each of them usually runs fast, but there can be a lot of them, which can also cause other tasks being stuck at the end of they queue, waiting for this hoard to be processed.

Note: The goal of this article is to show how to work with 3 different type of celery tasks in multiple queues: - Small in numbers, but high priority tasks, default queue, - long running tasks, long queue, - huge amount of small tasks, numerous queue.

Other parts of the app are for illustration purposes and can be far from being optimal.

Let's start!

Source code

Repository with the source code for this tutorial: https://github.com/appliku/celery_multiple_queues

Setup the project

Let's start from getting fresh djangitos project template.

curl -sSL https://appliku.com/djangitos.zip > djangitos.zip
unzip djangitos.zip


mv djangitos-master celery_multiple_queues
cd celery_multiple_queues
cp start.env .env

Requirements

These are packages that we will be actively relying in this tutorial:

Django==3.2.7
django-redis==5.0.0
celery==5.1.2
celery-redbeat==2.0.0
arrow==1.1.1
django-environ==0.6.0
django-extensions==3.1.3
factory-boy==3.2.0
Faker==8.14.0

The rest of the requirements you can find here: https://github.com/appliku/djangitos/blob/master/requirements.txt

You can explore Celery setup in djangito/celeryapp.py and in Procfile where you find what commands are used to run Celery worker and scheduler.

By the way for scheduler we use celery-redbeat.

Create an app

Create an app myapp

docker-compose run web python manage.py startapp myapp

Add it to installed apps in djangito/settings.py to the list PROJECT_APPS:

PROJECT_APPS = [
    'usermodel',
    'ses_sns',
    'myapp',  # new
]

Create empty file myapp/tasks.py. Create myapp/urls.py with the following content:

from django.urls import path
from myapp import views

urlpatterns = [

]

In djangito/urls.py add myapp urls to the root URLConf:

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('', include('myapp.urls')),  # new
    path('sns/', include('ses_sns.urls')),
    path('admin/', admin.site.urls),
    path('ckeditor/', include('ckeditor_uploader.urls')),
]

Let's prepare the base template.

Create a file myapp/templates/myapp/base.html with this content:

<!DOCTYPE html>
<html lang="en">
<head>
    <title>{% block title %}{% endblock %}</title>
    <meta charset="utf-8">
</head>

<body>
{% block content %}{% endblock %}
</body>
</html>

Now let's start building the view.

Create models and views

Our app needs 2 main models: - contact list uploads - contacts

Contact list uploads model will be used for uploading and processing files.

Contact model will have records of emails and names and will be linked to the ID from the upload model.

Put this code into myapp/models.py:

from django.db import models

from myapp.tuples import CONTACT_UPLOAD_STATUSES


class ContactUpload(models.Model):
    STATUS_CHOICES = (
        (CONTACT_UPLOAD_STATUSES.pending, 'Pending'),
        (CONTACT_UPLOAD_STATUSES.processing, 'Processing'),
        (CONTACT_UPLOAD_STATUSES.finished, 'Finished'),
        (CONTACT_UPLOAD_STATUSES.failed, 'Failed'),
    )
    contact_file = models.FileField(upload_to='uploads/%Y/%m/%d/')
    created_dt = models.DateTimeField(auto_now_add=True)
    status = models.IntegerField(
        choices=STATUS_CHOICES,
        default=CONTACT_UPLOAD_STATUSES.pending
    )

    class Meta:
        verbose_name = 'Contact Upload'
        verbose_name_plural = 'Contact Uploads'
        ordering = ('-pk',)


class Contact(models.Model):
    email = models.EmailField()
    has_mx_records = models.BooleanField(default=None, null=True)
    upload = models.ForeignKey(ContactUpload, on_delete=models.CASCADE)

    class Meta:
        verbose_name = 'Contact'
        verbose_name_plural = 'Contacts'
        ordering = ('name',)

Create a file myapp/tuples.py where we define a namedtuple called CONTACT_UPLOAD_STATUSES:

from collections import namedtuple

CONTACT_UPLOAD_STATUSES = namedtuple(
    'CONTACT_UPLOAD_STATUSES',
    "pending processing finished failed")._make(range(4))

Let's create migrations for our new models and apply them.

docker-compose run web python manage.py makemigrations myapp
docker-compose run web python manage.py migrate

Now let's create views for uploading our contact lists.

We will need a view to list all uploads, view for creating upload and a detail view where we list all our contacts.

Add this to myapp/views.py:

from django.views.generic import CreateView, DetailView, ListView

from myapp.models import ContactUpload
from myapp.tuples import CONTACT_UPLOAD_STATUSES


class ContactUploadListView(ListView):
    model = ContactUpload
    template_name = 'myapp/list.html'


class ContactUploadCreateView(CreateView):
    template_name = 'myapp/create.html'
    model = ContactUpload
    fields = ('contact_file',)


class ContactUploadDetailView(DetailView):
    template_name = 'myapp/detail.html'
    model = ContactUpload
    object: ContactUpload

    def get_context_data(self, **kwargs):
        kwargs = super().get_context_data(**kwargs)
        if self.object.status == CONTACT_UPLOAD_STATUSES.finished:
            kwargs['processing_finished'] = True
        return kwargs

One thing to note here is get_context_data in ContactUpploadDetailView. We need this to decide if we want to show numbers of imported contacts, which I decided to do only if upload is complete and successful. To avoid comparison in templates with the value from the namedtuple we do this in the view.

Urls in myapp/urls.py will look this way:

from django.urls import path
from myapp import views

urlpatterns = [
    path("", views.ContactUploadListView.as_view(), name="contact_upload_list"),
    path("create", views.ContactUploadCreateView.as_view(), name="contact_upload_create"),
    path("detail/<pk>", views.ContactUploadCreateView.as_view(), name="contact_upload_detail"),
]

Next we need 3 templates.

Upload form with multipart attribute to support file uploads in myapp/templates/myapp/create.html:

{% extends "myapp/base.html" %}
{% block title %}Upload new file{% endblock %}

{% block content %}
    <form action="" method="post" enctype="multipart/form-data">
        {{ form.as_p }}
        {% csrf_token %}
        <button type="submit">Upload</button>
    </form>
{% endblock %}

List of our uploads in myapp/templates/myapp/list.html:

{% extends 'myapp/base.html' %}
{% block title %}List of Uploads{% endblock %}
{% block content %}
    <a href="{% url "contact_upload_create" %}">Upload New File</a>
    <table>
        <tr>
            <th>ID</th>
            <th>FILE</th>
            <th>DT ADDED</th>
            <th>STATUS</th>
        </tr>
        {% for object in object_list %}
            <tr>
                <td><a href="{% url "contact_upload_detail" object.pk %}">{{ object.pk }}</a></td>
                <td>{{ object.contact_file.name }}</td>
                <td>{{ object.created_dt }}</td>
                <td>{{ object.get_status_display }}</td>
            </tr>
        {% endfor %}
    </table>

{% endblock %}

Details of our contact upload in myapp/detail.html:

{% extends 'myapp/base.html' %}
{% block title %}Viewing Contacts Upload #{{ object.pk }}{% endblock %}

{% block content %}
    <h1>{{ object.contact_file.name }}</h1>
    <h2>Status: {{ object.get_status_display }}</h2>
    {% if processing_finished %}
        <h3>Total contacts found: {{ object.contact_set.all.count }}</h3>
    {% else %}
        <script>
            setTimeout(function () {
                window.location.reload();
            }, 3000);
        </script>
    {% endif %}

    <table>
        <tr>
            <th>Email</th>
            <th>MX Found</th>
        </tr>
        {% for contact in object.contact_set.all %}
            <tr>
                <td>{{ contact.email }}</td>
                <td>{{ contact.has_mx_records|yesno:"✅,❌,⁇" }}</td>
            </tr>
        {% endfor %}
    </table>
{% endblock %}

I used emoji for simplicity here and readability of the column "MX Found" in the table. In production grade app I might use fontawesome or something like similar.

Now make sure your app is running, if not run docker-compose up.

Open our app at http://0.0.0.0:8060/

Click on "Upload new file".

You will see a very simple form with a file upload widget.

We don't have anything to upload. Let's make a view to generate a fake contacts file.

A Django view to Generate CSV file with Faker

We will write a small view that generates fake data and responds with a FileResponse.

Edit myapp/views.py. Add the following imports and a new view:

from io import BytesIO

from django.http import FileResponse
from django.views import View
from faker import Faker

class GenerateFakeContactList(View):
    def generate_data(self, number_contacts):
        fake = Faker('en_US')

        memory_file = BytesIO()
        content = '\n'.join([fake.email() for i in range(number_contacts)]).encode('utf-8')
        memory_file.write(content)
        memory_file.seek(0)
        return memory_file

    def get(self, request, *args, **kwargs):
        number_contacts = 100
        if request.GET.get('number_contacts'):
            number_contacts = int(request.GET.get('number_contacts'))
        return FileResponse(self.generate_data(number_contacts), filename="output.csv", as_attachment=True)

Also add this new view to myapp/urls.py

from django.urls import path
from myapp import views

urlpatterns = [
    path("", views.ContactUploadListView.as_view(), name="contact_upload_list"),
    path("create", views.ContactUploadCreateView.as_view(), name="contact_upload_create"),
    path("detail/<pk>", views.ContactUploadCreateView.as_view(), name="contact_upload_detail"),
    path("csv", views.GenerateFakeContactList.as_view(), name="contact_generate"),  # new
]

Now you can open it in browser and you should have your .csv downloaded.

http://0.0.0.0:8060/csv

Celery task to process an uploaded file

Now let's make 3 tasks.

  • First one to process a file upon upload
  • Second one to process each contact
  • Third one to refresh cache of contact lists

Then we'll update the ContactUploadCreateView to call the celery task when file is uploaded.

First task will go into our queue for long tasks, second will go to third queue for numerous tasks and third one will go to the default queue, that are high priority tasks that are responsible for user facing data.

Create a file myapp/tasks.py:

import logging
import dns.resolver
from celery import shared_task, current_app
from django.core.cache import cache
from django.db import transaction
from django.urls import reverse

from myapp.models import ContactUpload, Contact
from myapp.tuples import CONTACT_UPLOAD_STATUSES

logger = logging.getLogger(__name__)


@shared_task(name="process_uploaded_file")
def process_uploaded_file(upload_id: int):
    try:
        contact_upload = ContactUpload.objects.get(pk=upload_id)
    except ContactUpload.DoesNotExist:
        return
    contact_upload.status = CONTACT_UPLOAD_STATUSES.processing
    contact_upload.save(update_fields=['status', ])
    try:
        file_data = contact_upload.contact_file.read()
        lines = file_data.decode('utf-8').split('\n')
        for line in lines:
            contact = contact_upload.contact_set.create(email=line)
            transaction.on_commit(
                lambda: current_app.send_task(
                    'process_contact_mx_records',
                    kwargs={"contact_id": contact.id},
                    queue="numerous"))
        contact_upload.status = CONTACT_UPLOAD_STATUSES.finished
        contact_upload.save(update_fields=['status', ])
    except Exception as e:
        contact_upload.status = CONTACT_UPLOAD_STATUSES.failed
        contact_upload.error_message = str(e)
        contact_upload.save(update_fields=['error_message', 'status', ])


@shared_task(name="process_contact_mx_records")
def process_contact_mx_records(contact_id: int):
    try:
        contact = Contact.objects.get(pk=contact_id)
    except Contact.DoesNotExist:
        return False
    domain = contact.email.split('@')[1]
    try:
        nameservers = dns.resolver.resolve(domain, rdtype='MX', search=True)
    except Exception as e:
        _ = e
        logger.error(f"Exception while getting MX records for domain {domain}: {e}")
        contact.has_mx_records = False
        contact.save(update_fields=['has_mx_records', ])
        return
    logger.debug(f"{nameservers} {dir(nameservers)}")
    contact.has_mx_records = len(list(nameservers)) > 0
    contact.save(update_fields=['has_mx_records', ])


@shared_task(name="update_contact_lists_numbers")
def update_contact_lists_numbers():
    with cache.lock("update_contact_list_numbers", timeout=60, blocking_timeout=1):
        contact_lists = [{
            "pk": c.pk,
            "url": reverse('contact_upload_detail', args=(str(c.pk),)),
            "file": c.contact_file.name,
            "status": c.get_status_display()
        } for c in ContactUpload.objects.all()]
        cache.set("contact_lists", contact_lists, 60 * 2)

The task process_uploaded_file will be called upon file upload with the ID of the ContactUpload instance via long queue.

As it goes over rows in the file it will create tasks process_contact_mx_records with ID of contact via numerous queue.

Finally update_contact_lists_numbers we should add to scheduled tasks in djangito/celeryapp.py in app.conf.beat_schedule:

app.conf.beat_schedule = {
    'update_contact_lists_numbers': {
        'task': 'update_contact_lists_numbers',
        'schedule': 60,
        'options': {
            'ignore_result': True,
            'expires': 60,
            'queue': 'default',
        }
    },
}

It will be executed every 60 seconds and expire after 60 seconds if it was not processed in time. It will be sent to the default queue.

Let's update our views in myapp/views.py:

from django.views.generic import CreateView, DetailView, TemplateView
from django.core.cache import cache


class ContactUploadListView(TemplateView):
    model = ContactUpload
    template_name = 'myapp/list.html'

    def get_context_data(self, **kwargs):
        kwargs = super().get_context_data(**kwargs)
        kwargs['object_list'] = cache.get('contact_lists', [])
        return kwargs


class ContactUploadCreateView(CreateView):
    template_name = 'myapp/create.html'
    model = ContactUpload
    fields = ('contact_file',)

    def get_success_url(self):
        return reverse('contact_upload_detail', args=(str(self.object.pk),))

    def form_valid(self, form):
        response = super().form_valid(form)
        transaction.on_commit(
            lambda: current_app.send_task(
                "process_uploaded_file",
                kwargs={"upload_id": self.object.id}, queue="long"))
        return response

Now our ContactUploadListView is actually a TemplateView and it only will use cache data, instead of ORM call.

This whole cache thing is to demonstrate that we can have priority tasks that affect user experience and must be ran without being block by other tasks. Another example of such high priority tasks can be sending emails for password resets. Nobody wants to wait several hours for their password reset instructions to arrive.

In ContactUploadCreateVoew we have added get_success_url method and override for form_valid that on DB transaction commit will send a celery task for the new ContactUpload instance.

Also update the template myapp/templates/myapp/list.html for the new data format that we have:

{% extends 'myapp/base.html' %}
{% block title %}List of Uploads{% endblock %}
{% block content %}
    <a href="{% url "contact_upload_create" %}">Upload New File</a>
    <br>
    <a href="{% url "contact_generate" %}">Download Fake CSV</a>
    <br>
    <table>
        <tr>
            <th>ID</th>
            <th>FILE</th>
            <th>STATUS</th>
        </tr>
        {% for object in object_list %}
            <tr>
                <td><a href="{{ object.url }}">{{ object.pk }}</a></td>
                <td>{{ object.file }}</td>
                <td>{{ object.status }}</td>
            </tr>
        {% endfor %}
    </table>

{% endblock %}

Now that we have 2 new queues we want to have more separate celery workers.

Open docker-compose.yml and add 2 blocks under services: celery-long and celery-numerous so the whole file looks like this:

version: '3.3'
services:
  redis:
    image: redis
    ports:
      - "6379:6379"
  rabbitmq:
    image: rabbitmq
    environment:
      - RABBITMQ_DEFAULT_USER=djangito
      - RABBITMQ_DEFAULT_PASS=djangito
      - RABBITMQ_DEFAULT_VHOST=djangito
    ports:
      - "21001:5672"
      - "21002:15672"
  db:
    image: postgres
    environment:
      - POSTGRES_USER=djangito
      - POSTGRES_PASSWORD=djangito
      - POSTGRES_DB=djangito
    ports:
      - "21003:5432"
  web:
    build: .
    restart: always
    command: python manage.py runserver 0.0.0.0:8060
    env_file:
      - .env
    ports:
      - "127.0.0.1:8060:8060"
    volumes:
      - .:/code
    links:
      - db
      - redis
      - rabbitmq
    depends_on:
      - db
      - redis
      - rabbitmq

  celery:
    build: .
    restart: always
    command: celery -A djangito.celeryapp:app  worker -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
    env_file:
      - .env
    volumes:
      - .:/code
    links:
      - db
      - redis
      - rabbitmq
    depends_on:
      - db
      - redis
      - rabbitmq
  celery-long:
    build: .
    restart: always
    command: celery -A djangito.celeryapp:app  worker -Q long -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
    env_file:
      - .env
    volumes:
      - .:/code
    links:
      - db
      - redis
      - rabbitmq
    depends_on:
      - db
      - redis
      - rabbitmq
  celery-numerous:
    build: .
    restart: always
    command: celery -A djangito.celeryapp:app  worker -Q numerous -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
    env_file:
      - .env
    volumes:
      - .:/code
    links:
      - db
      - redis
      - rabbitmq
    depends_on:
      - db
      - redis
      - rabbitmq

  celery-beat:
    build: .
    restart: always
    command: celery -A djangito.celeryapp:app beat -S redbeat.RedBeatScheduler  --loglevel=DEBUG --pidfile /tmp/celerybeat.pid
    env_file:
      - .env
    volumes:
      - .:/code
    links:
      - db
      - redis
      - rabbitmq
    depends_on:
      - db
      - redis
      - rabbitmq

Take a closer look at commands for celery-long and celery-numerous, the -Q parameter says which queue worker will consume tasks from. They should match those that we use in send_task queue parameter. If you don't have worker that consumes messages from the queue - they will pile up consuming space, until database will crash because of not enough disk space.

In order to changes have any effect you need to stop your docker-compose running by pressing CTRL-C and start it again with docker-compose up.

Go to http://0.0.0.0:8060/, click on "Upload New File" link and upload the demo file that we have generated, click "Upload" button.

You will be redirected to the detail page for this upload. At first it will show status: Pending or Processing (depending on wether or not Celery task managed to start working already).

Processing the CSV file upload with Django Celery

Then thanks to a little piece of JS code, that appears if processing is not finished, the page will reload and you will see all the contacts from the file.

Processing is finished, but not all contacts have been checked

At this stage the task of processing the file has finished, but tasks for checking domains' MX records are not.

If you refresh the page again you will see a different picture – the column "MX Found" is full of green and red icons.

Celery tasks have finished processing the data

Let's look in terminal where we have docker-compose up running and make sure that messages went into right queues.

Latest log records will start from celery-numberous_1, and it means that process_contact_mx_records went into the numerous queue.

docker-compose Celery logs

If you scroll up and search for process_uploaded_file you will see that it went to the long queue and the container that runs it is called celery-long_1.

Also, every minute our scheduled task should be seen in logs for updating the cache:

Celery scheduled task log

Let's go to the main page and see that we have data in our cache and the page is rendered using that cache.

Last step is to prepare for deployment.

Open the Procfile in the root of the project and add two lines for our Celery workers. They will be used during deployment with Appliku to run separate workers.

workerlong: celery -A djangito.celeryapp:app  long -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
workernumerous: celery -A djangito.celeryapp:app  numerous -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1

Hope this helped to understand how to work with multiple queues in Celery and why you might need it.