Dynamically Update Periodic tasks in Celery bannerCelery is a popular distributed tasks queue. It is often used with Django framework to process heavy computational tasks in the background. You can add a single task to the queue or define periodic tasks. Periodic tasks are automatically scheduled for execution based on set time. The most common approach is to define the periodic tasks before the Celery worker is started. For example, it can be daily cleaning of the database. What if we would like to define periodic tasks dynamically? Recently, I’ve been working on a web app for uptime monitoring. The service continuously monitors a server and send email notification when the server is down. In the web app, the user can add a server address to be monitored and select the interval between requests to the server. How to dynamically add periodic tasks in Celery? I want to describe my approach in this article.

Overview

The uptime monitoring idea is simple. The user defines the server address, for example https://github.com and interval. Every time period, a request is made to the server. We store in the database the information about response time and status. There can be monitored multiple servers, each at a different interval. Moreover, the user can add or delete monitored addresses at any time. Here is a sequence diagram of my approach.

Start Django project

In this article, I will create a Django application with simple monitoring functionality to show you my approach for dynamic periodic tasks in Celery and Django. All code for this article is available in public GitHub repository.

Please set up a new Python virtual environment:

virtualenv dynenv --python=python3.8

source dynenv/bin/activate

We need to install the required packages:

  • django for web app development,
  • djangorestframework, markdown, django-filter for Django Rest Framework,
  • celery, django-celery-beat, gevent, sqlalchemy for background tasks processing,
  • requests to make HTTP requests.

Please install the following packages:

pip install django djangorestframework markdown django-filter celery django-celery-beat gevent sqlalchemy requests

It is a good practice to add required packages into the requirements.txt file.

We need to initialize the Django project with the django-admin command line tool:

django-admin startproject backend

Please change the directory to the backend and create a new app:

python manage.py startup monitors

We need to setup the Django to use a newly generated monitors app. Please update the INSTALLED_APPS in the backend/settings.py file:

# the rest of the code ...

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    # 3rd party
    "rest_framework",
    "django_celery_beat",
    # apps
    "monitors",
]

# the rest of the code ...

We added to the INSTALLED_APPS:

  • rest_framework - package for faster REST API development,
  • django_celery_beat - package that provides PeriodicTask model,
  • monitors - our new package.

Monitor database model

We will need two database models:

  • Monitor - a model for storing information about the monitored address and time interval between checks,
  • MonitorRequest - model to keep response time and status.

The backend/monitors/models.py file content:

from django.db import models
from django_celery_beat.models import PeriodicTask


class Monitor(models.Model):

    # monitored endpoint
    endpoint = models.CharField(max_length=1024, blank=False)

    # interval in seconds
    # enpoint will be checked every specified interval time period
    interval = models.IntegerField(blank=False)

    task = models.OneToOneField(
        PeriodicTask, null=True, blank=True, on_delete=models.SET_NULL
    )

    created_at = models.DateTimeField(auto_now_add=True)


class MonitorRequest(models.Model):

    # endpoint response time in miliseconds
    response_time = models.IntegerField(blank=False)

    response_status = models.IntegerField(blank=False)

    monitor = models.ForeignKey(Monitor, on_delete=models.CASCADE)

    created_at = models.DateTimeField(auto_now_add=True)


Please notice that Monitor has one-to-one relationship with PeriodicTask.

task = models.OneToOneField(
    PeriodicTask, null=True, blank=True, on_delete=models.SET_NULL
)

The PeriodicTask will be used to inform Celery about task execution and its frequency.

When the end-user adds a new server address for monitoring the Monitor object will be created in the database. The server address will be stored in the endpoint field. The interval field is in seconds.

Each request to the server will be saved in the database as a MonitorRequest object. We will store response time (in milliseconds) and status.

We will need to add serializers, views and URLs to have REST API available for Monitor and MonitorRequest.

Please add a new file serializers.py in the backend/monitors directory:

from rest_framework import serializers

from monitors.models import Monitor, MonitorRequest


class MonitorSerializer(serializers.ModelSerializer):
    class Meta:
        model = Monitor
        read_only_fields = ("id", "created_at")
        fields = (
            "id",
            "created_at",
            "endpoint",
            "interval",
        )


class MonitorRequestSerializer(serializers.ModelSerializer):
    monitor_endpoint = serializers.SerializerMethodField()

    def get_monitor_endpoint(self, obj):
        return obj.monitor.endpoint

    class Meta:
        model = MonitorRequest
        read_only_fields = ("id", "created_at")
        fields = (
            "id",
            "created_at",
            "response_time",
            "response_status",
            "monitor_endpoint",
        )

The serializers will just return available fields. The next step is to edit backend/monitors/views.py:

import json

from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import IntervalSchedule, PeriodicTask
from rest_framework import viewsets
from rest_framework.exceptions import APIException

from monitors.models import Monitor, MonitorRequest
from monitors.serializers import MonitorRequestSerializer, MonitorSerializer


class MonitorViewSet(viewsets.ModelViewSet):

    serializer_class = MonitorSerializer
    queryset = Monitor.objects.all()

    def perform_create(self, serializer):
        try:
            with transaction.atomic():

                instance = serializer.save()
                schedule, created = IntervalSchedule.objects.get_or_create(
                    every=instance.interval,
                    period=IntervalSchedule.SECONDS,
                )

                task = PeriodicTask.objects.create(
                    interval=schedule,
                    name=f"Monitor: {instance.endpoint}",
                    task="monitors.tasks.task_monitor",
                    kwargs=json.dumps(
                        {
                            "monitor_id": instance.id,
                        }
                    ),
                )
                instance.task = task
                instance.save()

        except Exception as e:
            raise APIException(str(e))

    def perform_destroy(self, instance):
        if instance.task is not None:
            instance.task.delete()
        return super().perform_destroy(instance)


class MonitorRequestViewSet(viewsets.ModelViewSet):

    serializer_class = MonitorRequestSerializer
    queryset = MonitorRequest.objects.all()

We have two views. The MonitorRequestViewSet derives from ModelViewSet and doesn’t overwrite any functions. It is simple CRUD for MonitorRequest objects.

The MonitorViewSet overwrites perform_create(self, serializer) and perform_destroy(self, instance). During monitor creation, a PeriodicTask instance is created. The PeriodicTask instance requires an IntervalSchedule object. The IntervalSchedule defines the time period between every task execution.

with transaction.atomic():

    instance = serializer.save()
    
    # create `IntervalSchedule` obejct
    schedule, created = IntervalSchedule.objects.get_or_create(
        every=instance.interval,
        period=IntervalSchedule.SECONDS,
    )

    # create `PeriodicTask` object
    task = PeriodicTask.objects.create(
        interval=schedule,
        name=f"Monitor: {instance.endpoint}",
        task="monitors.tasks.task_monitor",
        kwargs=json.dumps(
            {
                "monitor_id": instance.id,
            }
        ),
    )
    # save task in monitor object
    instance.task = task
    instance.save()

During the PeriodicTask object creation, we pass the task function signature monitors.tasks.task_monitor and define kwargs. We will implement the task_monitor in a moment.

The last step is to define urls.py in the backend/monitors:


from django.urls import re_path
from rest_framework.routers import DefaultRouter

from monitors.views import MonitorRequestViewSet, MonitorViewSet

router = DefaultRouter()
router.register(r"monitors", MonitorViewSet)
router.register(r"requests", MonitorRequestViewSet)
monitors_urlpatterns = router.urls

We used DefaultRouter from Django Rest Framework. A basic viewer for REST API for monitors and requests will be generated by DRF.

We need to add monitors_urlpatterns in the backend/backend/urls.py to make them available in the Django application.

from django.contrib import admin
from django.urls import path

from monitors.urls import monitors_urlpatterns

urlpatterns = [
    path("admin/", admin.site.urls),
]

urlpatterns += monitors_urlpatterns

Please make migrations and apply them:

# please run in the backend directory

python manage.py makemigrations

python manage.py migrate

Celery configuration

We need to configure the Celery framework. Please add a new file celery.py in the backend/backend directory:

import os
import sys

from celery import Celery

CURRENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, CURRENT_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")

app = Celery("backend")

app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

In the backend/backend/settings.py please add the Celery configuration:


# the rest of the code ...

# celery broker and results in sqlite
CELERY_BROKER_URL = "sqla+sqlite:///celery.sqlite"
CELERY_RESULT_BACKEND = "db+sqlite:///celery.sqlite"

We will use SQLite as a broker and results backend.

It is an example project showing how to use PeriodicTask, thus broker and results backend performance is out of the scope of this article.

We have the Celery configuration completed.

Background task

Please add the tasks.py file in the backend/monitors directory:

from datetime import datetime, timedelta
from decimal import Decimal

import requests
from celery import shared_task

from monitors.models import Monitor, MonitorRequest


@shared_task(bind=True)
def task_monitor(self, monitor_id):

    try:

        monitor = Monitor.objects.get(pk=monitor_id)

        response = requests.get(monitor.endpoint, timeout=60)

        MonitorRequest.objects.create(
            response_time=int(response.elapsed.total_seconds() * 1000),
            response_status=response.status_code,
            monitor=monitor,
        )

    except Exception as e:
        print(str(e), type(e))

The task_monitor(self, monitor_id) function has @shared_task decorator. It accepts monitor_id as an argument - it is passed when PeriodicTask is created as a kwargs field.

The task_monitor sends a GET request to the monitored server and saves response time and status. It is a simplified version of task_monitor used in my uptime monitoring service.

Run Django and Celery

We are ready to play with our web application. You will need three terminals. Please start the Django development server in the first one:

python manage.py runserver

In the second terminal, please start the Celery worker:

celery -A backend worker --loglevel=info -P gevent --concurrency 1 -E

In the third terminal, please start Celery beat:

celery -A backend beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval 10

Celery beat service uses DatabaseScheduler from django-celery-beat package. The beat service checks scheduled tasks from the database. Tasks defined with PeriodicTask are persistent. Tasks will be available even after the Celery worker and beat restart.

Please open your (favorite) web browser and enter the address http://127.0.0.1:8000. You should see the REST API viewer automatically generated by DRF:

If you have problems or need help, please create a GitHub issue. We will try to help you! You won’t be alone.

Please open the monitors API at http://127.0.0.1:8000/monitors/1/. The list of monitors should be empty; let’s add the first monitor. Please fill out the form and click the POST button.

Please wait some time to have some results, and you should see in the Celery beat terminal:

[2022-10-17 12:05:59,887: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:06:59,887: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:07:59,909: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:08:59,909: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:09:59,914: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)

Example output for Celery worker terminal:

[2022-10-17 12:08:00,644: INFO/MainProcess] Task monitors.tasks.task_monitor[e8ea92d4-e834-4bdf-8f4a-af6ee0601b55] received
[2022-10-17 12:08:01,304: INFO/MainProcess] Task monitors.tasks.task_monitor[e8ea92d4-e834-4bdf-8f4a-af6ee0601b55] succeeded in 0.65664959300193s: None
[2022-10-17 12:09:00,101: INFO/MainProcess] Task monitors.tasks.task_monitor[8c597c74-4917-4f3e-894f-a6ae117fc0f3] received
[2022-10-17 12:09:01,909: INFO/MainProcess] Task monitors.tasks.task_monitor[8c597c74-4917-4f3e-894f-a6ae117fc0f3] succeeded in 1.8063794959998631s: None
[2022-10-17 12:10:00,604: INFO/MainProcess] Task monitors.tasks.task_monitor[739f9227-4b63-409d-9d72-57720c2da5f0] received
[2022-10-17 12:10:00,873: INFO/MainProcess] Task monitors.tasks.task_monitor[739f9227-4b63-409d-9d72-57720c2da5f0] succeeded in 0.2661438309987716s: None

Please open requests API at http://127.0.0.1:8000/requests/ at the beginnig you will see only an empty list:

After some time, it will be filled with requests data:

You can stop the monitoring task by deleting the monitor. Please open the http://127.0.0.1:8000/monitors/1/ (where 1 is monitor ID) and click the Delete button.

You should see that no more requests are produced. The monitor and an associated PeriodicTask object have been removed. The code for this article is available in the GitHub repository.

To check periodic tasks, you can open the Django Admin Panel at http://127.0.0.1:8000/admin.

Summary

Celery is a great task queue. You can dynamically create or delete periodic tasks with the django-celery-beat package without Celery restart. What is more, the PeriodicTask object allows you to dynamically change the interval values and pause the task (it was not described in this article). These features were very helpful for me while implementing the uptime monitoring service.


Let's stay in touch!

Would you like to be notified about new posts? Please fill this form.