Celery:Django

Pywiki
Sam (토론 | 기여)님의 2023년 1월 10일 (화) 18:36 판 (→‎뷰에서의 작업)
둘러보기로 가기 검색하러 가기

1 개요

장고 내에서 셀러리를 사용하기 위한 정보를 담은 문서. 완전 기초론 아무것도 배울 수 없기에 적절한 활용이 가능할 지식을 담아보았다.

1.1 용도

  • 장고나 플라스크 등 웹서버에서 오래걸리는 작업을 수행하게 되면 timeout이 떠버려 결과를 받아볼 수가 없다. 그렇다고 웹서버의 timeout을 늘려버리면 비효율적인 자원낭비가 예상되어 함부로 늘릴 수도 없다. 그럴 때 사용하는 celery(샐러리).
  • 스케쥴링 작업을 할 때에도 사요한다.

1.2 운용방식

보통 다음과 같이 구성된다.

분류 종류
웹서버 장고...
DB SQLite, Mysql...
Message queue

(메시지 브로커)

Redis나 RabbitMQ

메시지 브로커라는 외부 서버를 사용하여 작업을 수행한다. 이 메시지 브로커가 셀러리에 feed를 넘긴다.

즉, 장고가 파이썬 함수를 실행하면 셀러리가 해당 작업을 큐에 올린다. 샐러리는 이를 다시 메시지브로커에.(장고는 다른 작업을 지속할 수 있도록) 작업 결과는 셀러리로 돌아온다.

  1. 웹서버가 큐를 메시지 브로커에 올린다.
  2. 샐러리 워커가 일을 수행할 상황이 되면 큐를 가져와 수행한다.(워커별 독자적 처리)
  3. 워커의 처리결과는 DB에 저장하여 활용한다.

2 기초 사용법

사람마다 사용하는 방식이 다르다.

다음 방식은 규모가 있는 프로젝트를 다룰 때 사용하기 적합한 방식.

과정 설명 방법
사전설정 장고에서 celery_tutorial이라는 프로젝트가 만들어져 있다 가정하고 진행한다.
라이브러리 인스턴스 만들기 settings.py가 있는 디렉토리에 celery.py를 생성 우측과 같이 입력한다.


만약 app.autodiscover_tasks()를 사용하지 않는다면 인스턴스를 지정할 때 app = Celery('celery_tutorial', include=['앱이름.tasks']) 형태로 기입하면 포함된 tasks.py만 찾는다.

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_tutorial.settings')  # 설정파일 위치.

app = Celery('celery_tutorial')  # 보통 settings.py 파일이 있는 디렉토리 이름을 넣는다.

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')  # 장고의 설정을 읽고, 네임스페이스를 설정한다.

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()  # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다.
장고가 시작될 때 자동실행 자동실행을 위해 settings.py가 있는 디렉토리에 __init__.py를 만든다.
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)  # 샐러리
확장라이브러리 라이브러리를 설치한다. pip install django-celery-results
settings.py 설정 메시지브로커 주소 등 설정값을 넣어준다. 브로커로 RabbitMQ를 사용한다면 다음과 같이.
CELERY_BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/myvhost'  # 브로커 주소값.
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
샐러리 설정은 위의 celery.py에서 설정한 네임스페이스로 시작한다.
INSTALLED_APPS = (  # 이 안에 확장 라이브러리를 등록한다.
    'django_celery_results',
    )

CELERY_BROKER_URL = 'redis://localhost:6379'  # 브로커 주소값. 여기선 redis.
CELERY_RESULT_BACKEND = 'django-db'  # 장고 DB를 사용하는 경우. DB백엔드로 redis를 사용한다면 위와 동일하게.
# django-celery-result 백엔드 설정.
CELERY_CAHCE_BACKEND = 'django-cache'
마이그레이션 확장 라이브러리 사용을 위해 DB테이블 생성. python manage.py migrate celery_results
task 등록

task 작성

일을 등록한다.

위에서 app.autodiscover_tasks()를 사용했다면 각 앱의 tasks.py 안에 넣어주면 작동한다.

(우측은 예시로, 자세한 작성은 아래 '뷰에서의 작업'을 참고하자)


데코레이터가 붙으면 이것이 task로 인식된다.

사용하고자 하는 앱 안에서 tasks.py로 등록한다.
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def 함수(변수):
    명령
    return 결과
app.autodiscover_tasks()를 사용하지 않았다면 아래와 같이 사용한다.
from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def 함수(변수):
    명령
    return 결과
@app.task(bind=True)

def debug_task(self): print('Request: {0!r}'.format(self.request))

bind=True 옵션을 사용하면 task 인스턴스를 참조해 자신의 request를 쉽게 가져올 수 있다는데... 찾아볼 필요가 있겠다.

bind=True 옵션을 쓰면 무조건 첫번째 인수는 self로 받아야 한다.

샐러리 실행 [데몬으로 시작하는 법은 따로 있나보다.]

다른 터미널 창을 열어 따로 실행해주어야 한다.

task.py의 내용이 바뀌면 재실행 해주어야 한다.(장고의 내용이 바뀌면 재실행하듯.)

celery -A settings.py가있는폴더.celery worker --loglevel=info

3 뷰에서의 작업

과정 설명 방법
모델 작성 링크의 코드를 참조하여 작성하였다.
from django.db import models

class 모델(models.Model):
    STATUS_PENDING = 'PENDING'
    STATUS_ERROR = 'ERROR'
    STATUS_SUCCESS = 'SUCCESS'
    STATUSES = (  # 상태를 기입하기 위한 사전작업.
        (STATUS_PENDING, 'Pending'),
        (STATUS_ERROR, 'Error'),
        (STATUS_SUCCESS, 'Success'),
    )
    input = models.IntegerField()
    output = models.IntegerField(blank=True, null=True)
    created_at = models.DateTimeField(auto_now_add=True)
    modified_at = models.DateTimeField(auto_now=True)
    status = models.CharField(max_length=8, choices=STATUSES)
    message = models.CharField(max_length=110, blank=True)
delay()는 celery의 업무를 수행하는 apply_async()의 간소판.

카운트다운이나 재실행 등의 자세한 옵션은 apply_async()에 더 자세한 변수를 넣음으로써 가능해진다.(좀 과하게 긴 작업을 제한한다든가.)

from django.shortcuts import render, redirect
from django.views import View

from .models import 모델  # 모델을 활용한다면.
from .tasks import 필요한일

class CalculationView(View):
    def get(self, request):
        """시작하는 템플릿."""
        return render(request, 'app/start.html')

    def post(self, request):
        """업무 수행."""
        n = request.POST['폼에서받을값']
        calculation = 모델.objects.create(status=Calculation.STATUS_PENDING, 기타 필요한 변수 기입)
        필요한일.delay(calculation.id)  # delay는 백그라운드에서 업무를 수행하라는 의미.

        return redirect('fibonacci_list')
task 작성
from celery_tutorial.celery import app  # celery를 등록한 경로에서.
from .models import Calculation

@app.task(bind=True)
def 필요한일(self, calculation_id):
    """Perform a calculation & update the status"""
    calculation = Calculation.objects.get(id=calculation_id)
    try:
        calculation.output = fib(calculation.input)  # 함수를 위에서 정의하여 업무 실행.
        calculation.status = Calculation.STATUS_SUCCESS  # 업무가 실행되면 수행할 작업.
    except Exception as e:
        calculation.status = Calculation.STATUS_ERROR  # 에러로 끝났음을 알린다.
        calculation.message = str(e)[:110]  # 에러메시지를 담는다.
    calculation.save()