Посібник з Python Celery: маленький інструмент для великих можливостей

11 хв. читання

Celery — не новинка для спільноти Python. На певному етапі роботи вам може знадобитись фонова обробка завдань. Тут на допомогу приходить Celery, який може запускати відкладений або окремий код в ізольованому процесі або навіть на іншому комп'ютері чи сервері.

Інтро

Celery збільшує продуктивність внаслідок запуску частини завдань як фонових процесів на тому ж або іншому сервері. Найчастіше розробники використовують цей інструмент для надсилання електронної пошти. Однак Celery пропонує значно більше можливостей. Детальніше про базові концепції Celery, а також про кращі практики використання Celery у Python розповідаємо у статті.

Основи Celery

У цьому розділі ви дізнаєтесь, як налаштувати ваш проект на використання Celery. Розглянемо також більш конкретний приклад використання Celery з Django.

Спочатку вам треба створити екземпляр Celery, щоб позначити функції Python як Celery task (далі – завдання). Найкраще це робити в окремому файлі, тому що процес запуску Celery обов'язково повинен бути аналогічним до WSGI у Django. Тобто якщо ви створите два екземпляри — Flask та Celery в одному файлі та запустите його, ви отримаєте два екземпляри, але використовуватимете лише один. Якщо ви запустите Celery — процес буде аналогічним.

Приклади використання Celery в Django

Як вже згадувалось раніше, найпоширеніший приклад використання Celery — надсилання електронної пошти. Використаємо його для демонстрації особливостей Celery. Реалізація на Python:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context
 
from myproject.celery import app
 
 
def render_template(template, context):
    engine = Engine.get_default()
 
    tmpl = engine.get_template(template)
 
    return tmpl.render(Context(context))
    
 
@celery_app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
    )

З Celery зменшується тривалість очікування відповіді: процес надсилання пошти відокремлюється від основного коду, відповідального за повернення відповіді.

Найпростіший спосіб виконати завдання — викликати метод delay або функцію, що передбачена декоратором app.task.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Цим переваги Celery не обмежуються. Наприклад, ми могли б налаштувати здійснення повторних спроб при невдачі.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Тепер завдання буде запускатися повторно після невдалого надсилання листа. До того ж можна контролювати кількість повторних спроб.

Вас може цікавити, чому render_template винесено за межі send_mail. Усе тому, що ми огорнули виклик send_mail у блок try/except, а там, як відомо, повинно бути мінімум коду.

Celery для профі

Заплановані завдання з Celery в Django

Celery дозволяє запускати задачі за допомогою планувальників, на зразок crontab у Linux.

Якщо вам потрібні періодичні завдання, вкажіть прапор –beat перед запуском Celery. В іншому випадку Celery проігнорує планувальник. Наступним кроком буде створення конфігурації, що визначатиме, які задачі будуть виконуватись і коли. Розглянемо приклад:

from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    'monday-statistics-email': {
        'task': 'myproject.apps.statistics.tasks.monday_email',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}

Якщо ви не використовуєте Django, замініть CELERY_BEAT_SCHEDULE на celery_app.conf.beat_schedule. Тут ми визначили одне завдання, яке буде виконуватись кожного понеділка о 7 годині. Завданням можна передавати різні аргументи, на основі яких визначати суть задачі. Метод crontab підтримує синтаксис системного crontab. Наприклад, щоб завдання виконувалось кожні 15 хвилин, вкажіть crontab(minute='*/15').

Відкладене виконання завдань в Celery

Ви можете встановлювати затримку перед виконанням задач у Celery (наприклад, якщо вам треба надсилати сповіщення після певних подій). Для цього стане у пригоді метод apply_async з аргументами eta або countdown.

Аргумент Властивість
eta виконання завдання в точний час
countdown виконання завдання через N секунд

На практиці це має такий вигляд:

from datetime import datetime

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    countdown=15 * 60
)

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    eta=datetime(2019, 5, 20, 7, 0)

У першому варіанті лист буде надіслано через 15 хвилин, а у другому — 20 травня о 7 годині ранку.

Налаштування черги у Python Celery

Якщо у вас є декілька воркерів на різних серверах, які використовують спільну чергу повідомлень планування задач, ви можете розподілити Celery. Для цього налаштуйте додаткову чергу для завдання/воркера. Наприклад, надсилання електронного листа — критична частина вашої системи й ви не хочете, щоб будь-які інші завдання впливали на цей процес. Тоді ви можете додати нову чергу, назвавши її, скажімо, mail, і використовувати її для надсилання листів.

CELERY_TASK_ROUTES = {
    'myproject.apps.mail.tasks.send_mail_task': {'queue': 'mail', },
}

Знову ж таки, якщо ви не використовуєте Django, замініть CELERY_TASK_ROUTES на celery_app.conf.task_routes.

Запустіть два окремих celery воркери для черги за замовчуванням та створеної черги:

celery -A myproject worker -l info -Q celery
celery -A myproject worker -l info -Q mail

У першому рядку ми запускаємо воркер для черги за замовчуванням під назвою celery, а у другому — для черги mail. Можна використовувати перший worker і без аргументу -Q. Тоді він застосує усі сконфігуровані черги.

Довготривалі завдання у Python Celery

Іноді доводиться мати справу зі завданнями для проходження записів у БД та виконання певних операцій над ними. При цьому часто розробники забувають про збільшення обсягу даних, через що задачі виконуються довше. Подібні завдання краще визначати так, аби вони могли працювати з частинами даних. Простіше за все — додати offset і обмежити кількість параметрів для завдання. Так ви зможете визначити обсяг частини даних, а курсор — отримати нову порцію даних.

@celery_app.task
def send_good_morning_mail_task(offset=0, limit=100):
    users = User.objects.filter(is_active=True).order_by('id')[offset:offset + limit]
    for user in users:
        send_good_morning_mail(user)

    if len(users) >= limit:
        send_good_morning_mail_task.delay(offset + limit, limit)

Тут наведено лише простий приклад того, як можна реалізувати подібні задачі. Наприкінці завдання ми перевіряємо, скільки користувачів знайдено у БД. Якщо число дорівнює зазначеному ліміту, ми, найімовірніше, отримали дані про нового користувача, які треба обробити. Тому запускаємо завдання знову, але з іншим значенням offset. Якщо ж кількість користувачів менша за ліміт, то ми працюємо з останнім фрагментом даних, тому немає потреби продовжувати. Будьте обережні, адже за такої реалізації вам треба кожен раз мати однаковий порядок записів.

Результати завдань у Celery

Більшість розробників не зберігають отримані результати після виконання завдань. Уявіть, що можна взяти частину коду, присвоїти її завданню та виконати його незалежно, одразу після запиту користувача. Коли нам потрібні результати завдання, ми отримуємо їх назад тільки якщо задача завершена, або ж чекаємо на її завершення. Тоді ми можемо додати результат до загальної відповіді. З таким підходом скорочується час очікування відповіді від сервера, що добре впливає на рейтинг сайту.

Зверніть увагу на описану фічу при виконанні одночасних операцій. Припустимо, існує проект з великою кількістю сервісів та даних користувачів. Щоб визначити найкращий сервіс, ми робимо важкі розрахунки та перевірки. Для їх пришвидшення створюємо завдання для користувача з кожним сервісом, виконуємо їх та збираємо результати для подальшої демонстрації. Простіше за все реалізувати це за допомогою груп задач у Celery.

from celery import group

@celery_app.task
def calculate_service_provider_task(user_id, provider_id):
    user = User.objects.get(pk=user_id)
    provider = ServiceProvider.objects.get(pk=provider_id)

    return calculate_service_provider(user, provider)


@celery_app.task
def find_best_service_provider_for_user(user_id):
    user = User.objects.get(pk=user_id)
    providers = ServiceProvider.objects.related_to_user(user)

    calc_group = group([
        calculate_service_provider_task.s(user.pk, provider.pk)
        for provider in providers
    ]).apply_async()

    return calc_group

Чому ми взагалі виконуємо два завдання? Друге завдання потрібне, щоб сформувати групи задач розрахунків, запустити їх та повернути результат. Окрім того, у другому завданні ви можете організувати фільтрацію (наприклад, сервісів, над якими проводяться обчислення для даного користувача). Усе це може виконуватись, поки Celery зайнята іншою роботою. Коли ж група завдань повернеться, нас буде цікавити результат першого з них, який і буде результатом обчислень.

Поглянемо як усе це виглядає у коді:

def view(request):
    find_job = find_best_service_provider_for_user.delay(request.user.pk)

    # інший код

    calculations_results = find_job.get().join()

    # обробка calculations_results і надсилання відповіді

Корисні поради

Перевага ідентифікаторів

Замість цілих об'єктів можна використовувати ідентифікатори запису у БД. Такий підхід допомагає зменшити розмір черги повідомлень. Але важливо те, що коли завдання виконано, дані у БД можуть бути змінені. Коли у вас є лише ідентифікатори, ви отримуєте тільки свіжі дані, а не застарілі, як при роботі з об'єктами.

Транзакції

Іноді виконане завдання не може знайти об'єкт у БД і виникають проблеми. Чому так трапляється? У Django, наприклад, ви виконуєте задачу після того, як користувач зареєструвався (скажімо, надіслав лист на пошту), а ваші запити огортаються у транзакцію через налаштування у Django. Однак у Celery завдання виконуються швидко, навіть до завершення транзакції. Тому ви, найімовірніше, побачите, що у БД немає даних про користувача (поки що).

Щоб виправити цю ситуацію, загугліть «task transaction implementation». Це перезаписаний метод apply_async у завданні, тобто клас, який не виконує задачу негайно, а встановлює transaction.on_commit.

Висновок

Як бачимо, функціонал і користь Celery не обмежуються надсиланням електронних листів. Ви можете запускати різні завдання одночасно, використовуючи основний процес. А поки ви займаєтесь своїми справами, Celery буде виконувати менші задачі. Налаштовуйте черги, працюйте з фрагментами даних або довготривалими завданнями, визначивши час на їх виконання.

Celery допоможе вам краще планувати роботу, більш ефективно розпоряджатися часом на розробку і працювати над важливими речами, поки групи завдань допомагають з менш значущими питаннями.

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Codeguida 4.7K
Приєднався: 10 місяців тому
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація