【Python x Django】Djangoによる非同期処理実装(Cerery,Redis)

初めに

DjangoによるWebアプリを開発するにあたって非同期処理実装をCerery,Redisを使用して実現したので、その方法について記載していきます。

この記事で出来るようになること

  • Django上で非同期処理を実装するための環境が構築できるようになる
  • Django上で非同期処理の実装ができるようになる

必要となる知識

  • Linux OS の基本的操作(CUI全般, インストール)
  • Pythonプログラミングの知識
  • Django の基本的知識(プロジェクト作成、設定の編集)

わからなくなったらググれれば大丈夫です。

公式リファレンス

今回の実装環境

  • ubuntu20.04
  • Python 3.8.10
  • Django 3.2.5
  • Celery 5.1.2(Celery
  • redis-server 5.0.7-2(Redis

Celery・Redisの導入

まずはサーバに導入していきます。

#celeryのインストール
root# pip3 install celery
root# pip3 install django-celery-results

#Redisのインストール
root# apt install redis-server
root# pip3 install django-redis

最後に入れたRedisは(リモートディクショナリサーバ)と呼ばれるもの。調べてみると色々な用途に使えそうですが今回は彼にBrokerとして働いてもらいます。
流れとしてはフロントからタスクが発生⇒RedisがそのタスクをCeleryに渡す⇒Celeryがタスクを実行、という感じでしょうか。ちなみにRedis以外だとRabbitMQもBrokerとして使えるみたいですね。

Django・Celeryの連携

それでは検証するためのプロジェクトを作っていきます。

root# django-admin startproject MyProject
root# cd MyProject/
root# python3 manage.py startapp celeryapp
root# tree

MyProject/
├──celeryapp
|   ├── admin.py
|   ├── apps.py
|   ├── __init__.py
|   ├── migrations
|   |   └── __init__.py
|   ├── models.py
|   ├── tests.py
|   └── views.py
├── manage.py
└── MyProject
    ├── asgi.py
    ├── __init__.py
    ├── __pycache__
    |   ├── __init__.cpython-38.pyc
    |   └── settings.cpython-38.pyc
    ├── settings.py
    ├── urls.py
    └── wsgi.py

settings.pyにCeleryの設定を記述していきます。

・・・
INSTALLED_APPS = [
    ・・・
    ・・・
    'django_celery_results',
    'celeryapp', #今回使用するapp
]
・・・
# Celery設定
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/1')
CELERY_RESULT_BACKEND = "django-db"

BrokerとしてRedisを利用できるようにし、ジョブの実行結果をDBに保存するように設定しました。
次にsettings.pyと同じ階層にcelery.pyを作成し以下を記述。

import os
from celery import Celery

# celeryで使うDjangoの設定ファイル(settings.py)を指定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyProject.settings')

app = Celery('MyProject')

# Djangoのconfigファイルをceleryのconfigとして使う宣言、celery用のconfigファイルを作ってもいい。
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

さらに同じ階層の__init__.pyに以下を追加。

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

振り返りましょう。

MyProject/
├──celeryapp
|   ├── admin.py
|   ├── apps.py
|   ├── __init__.py
|   ├── migrations
|   |   └── __init__.py
|   ├── models.py
|   ├── tests.py
|   └── views.py
├── manage.py
└── MyProject
    ├── asgi.py
    ├── celery.py★
    ├── __init__.py★
    ├── __pycache__
    |   ├── __init__.cpython-38.pyc
    |   └── settings.cpython-38.pyc
    ├── settings.py★
    ├── urls.py
    └── wsgi.py

上記で★を付けたところが追記もしくは新規作成したものです。

task・画面の作成

次にceleryで実行したいタスクを作成します。appの中にtasks.pyを作成しそこに非同期で実施したい処理を書きます。

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def add(x, y):
    print("処理中")
    z = x + y
    time.sleep(10)
    print("処理完了")
    return z

重要なのはデコレータの@shared_taskで、これを関数の前に付けることでceleryのタスクとして認識されます。
あとはこれを実行する画面と接続(urls.py)を編集すればテスト環境が整います。
まずは画面を作成します。

<html lang="ja">
    <head>
      <title>{% block title %}celeryテスト{% endblock %}</title>
      <meta http-equiv="refresh" content="10; URL="> 
    </head>        
    <body>           
      <h1>初心者でも非同期処理がしたい</h1>                
            <p>値を入力</p>
            <form method="post" action="celery">
                {% csrf_token %}
              <tbody>
                <td><input type="text" name="input_a"></td>
                <td>+</td>
                <td><input type="text" name="input_b"></td>
              </tbody>
              <input type="submit" name="add_button" value="計算!">
            </form>    
      <h1>結果!</h1>
            <tbody>        
                <td>出力値:</td>
                <td>{{ result }}</td>
            </tbody>
    </body>
</html>

次にviews.pyを更新します。

from django.shortcuts import render
from django.http import HttpResponse
from django_celery_results.models import TaskResult
from django.template import loader
from .tasks import add

def celery(requests):
    template = loader.get_template('main/celery.html')
    if 'add_button' in requests.POST:
        x = int(requests.POST['input_a'])
        y = int(requests.POST["input_b"])
        task_id = add.delay(x,y)★

    result = list(TaskResult.objects.all().values_list("result",flat=True))
    if len(result) == 0:
        result = [0]
    context = {'result': result[0]}
    return HttpResponse(template.render(context, requests))

少し説明します。
★がついているところですが、非同期処理のtask(関数)を実行する場合は.delayを付けます。

    result = list(TaskResult.objects.all().values_list("result",flat=True))
    if len(result) == 0:
        result = [0]
    context = {'result': result[0]}

この部分ですが、Celeryで処理した結果は専用のテーブル(django_celery_results_taskresult)に保存されます。今回はそこから結果を取得するようにしています。

最後に接続部分を編集

MyProjectのurls.pyを以下のように変更し、

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include('celeryapp.urls')),
]

appのurls.pyを以下のように変更します。

from django.urls import path
from . import views

urlpatterns = [
    path('celery', views.celery),
]

今回追加・追記を行ったファイルは下記(★★)になります。

MyProject/
├──celeryapp
|   ├── admin.py
|   ├── apps.py
|   ├── __init__.py
|   ├── migrations
|   |   └── __init__.py
|   ├── models.py
|   ├── templates
|   |   └── main
|   |        └── celery.html★★
|   ├── tests.py
|   ├── urls.py★★
|   ├── tasks.py★★
|   └── views.py★★
├── manage.py
└── MyProject
    ├── asgi.py
    ├── celery.py★
    ├── __init__.py★
    ├── __pycache__
    |   ├── __init__.cpython-38.pyc
    |   └── settings.cpython-38.pyc
    ├── settings.py★
    ├── urls.py
    └── wsgi.py

実行

準備はできたのでテストしてみましょう。

実行するにあたりDBのmigrateを行います。migrateをすることでCeleryのテーブルが自動で作成されます。

root# python3 manage.py migarate

次にCeleryを起動します。Celeryコマンドはmanage.pyがあるフォルダで実行します。

root# celery -A MyProject worker -l info
・・・

[2021-08-08 10:59:43,369: INFO/MainProcess] celery@python ready.

コマンドのcelery -A Site worker -l infoですが、最後の-l infoは出力するログレベルを指定できます。(ERROR等)
コマンドのMyProjectの部分は適宜自分のプロジェクト名に変更してください。

最後にrunserverしてアクセスしてみます。

root# python3 manage.py runserver 0:8000

無事起動したことを確認したらhttp://サーバIPアドレス:8000/celeryへ行きましょう。

このように表示されます。

値を入れて計算を押してみます。

ここが肝ですが、ボタンを押すとページがリロードされます。
今回の処理は計算をしたあと10秒スリープするのですぐには結果が返ってきません、がページが待機状態になったりすることはなくブラウザを閉じても問題ありません。
ボタンを押せば後は勝手に裏でサーバが処理を続けてくれていることが分かります。
少し待つと

結果が返ってきました!

ちなみにサーバ側のCelery画面では

[2021-08-08 15:54:21,537: INFO/MainProcess] Task celeryapp.tasks.add[d76317d6-ad2f-46a8-ba59-ee936dc76797] received
[2021-08-08 15:54:21,538: WARNING/ForkPoolWorker-1] 処理中
[2021-08-08 15:54:21,538: WARNING/ForkPoolWorker-1]

[2021-08-08 15:54:31,544: WARNING/ForkPoolWorker-1] 処理完了
[2021-08-08 15:54:31,544: WARNING/ForkPoolWorker-1]

[2021-08-08 15:54:31,549: INFO/ForkPoolWorker-1] Task celeryapp.tasks.add[d76317d6-ad2f-46a8-ba59-ee936dc76797] succeeded in 10.011916563998966s:

ちゃんと処理が動いていることが確認できます。
またDjangoの管理画面に入るとページが自動で作られ処理内容が確認できます。

終わりに

今回はDjangoで非同期処理を実装する方法について記載しました。

実際にこれを利用したアプリの実装についてはまた別記事で書きたいと思います。

この記事が気に入ったら
フォローしよう

最新情報をお届けします

Twitterでフォローしよう

おすすめの記事