Celery:Django

Pywiki
Sam (토론 | 기여)님의 2023년 1월 11일 (수) 10:57 판
둘러보기로 가기 검색하러 가기

1 개요

장고 내에서 셀러리를 사용하기 위한 정보를 담은 문서. 완전 기초론 아무것도 배울 수 없기에 적절한 활용이 가능할 지식을 담아보았다.

1.1 용도

  • 장고나 플라스크 등 웹서버에서 오래걸리는 작업을 수행하게 되면 timeout이 떠버려 결과를 받아볼 수가 없다. 그렇다고 웹서버의 timeout을 늘려버리면 비효율적인 자원낭비가 예상되어 함부로 늘릴 수도 없다. 그럴 때 사용하는 celery(샐러리).
  • 스케쥴링 작업을 할 때에도 사용한다.

1.2 운용방식

보통 다음과 같이 구성된다.

분류 종류
웹서버 장고...
DB SQLite, Mysql...
Message queue

(메시지 브로커)

Redis나 RabbitMQ

메시지 브로커라는 외부 서버를 사용하여 작업을 수행한다. 이 메시지 브로커가 셀러리에 feed를 넘긴다.

  1. 장고가 파이썬 함수를 실행하면
  2. 셀러리가 해당 작업을 큐에 올린다.(메시지 브로커에 올린다.)
  3. 샐러리 워커가 일을 수행할 상황이 되면 큐를 가져와 수행한다.(워커별 독자적 처리)
  4. 워커의 처리결과는 DB 등에 저장하여 활용한다.

2 사전 준비

사람마다 사용하는 방식이 다르다.

다음 방식은 규모가 있는 프로젝트를 다룰 때 사용하기 적합한 방식.

과정 설명 방법
사전설정 장고에서 celery_tutorial이라는 프로젝트가 만들어져 있다 가정하고 진행한다.
라이브러리 인스턴스 만들기 settings.py가 있는 디렉토리에 celery.py를 생성 우측과 같이 입력한다.



만약 app.autodiscover_tasks()를 사용하지 않는다면..

인스턴스를 지정할 때 app = Celery('celery_tutorial', include=['앱이름.tasks']) 형태로 기입하면 포함된 tasks.py만 찾는다.

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '프로젝트명.settings')  # 설정파일 위치.

app = Celery('프로젝트명')  # 보통 settings.py 파일이 있는 디렉토리 이름을 넣는다.

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')  # 장고의 설정을 읽고, 네임스페이스를 설정한다.

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()  # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다.
장고가 시작될 때 자동실행 자동실행을 위해 settings.py가 있는 디렉토리에 __init__.py를 만든다.

위에서 등록한 샐러리 앱을 반영한다.

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)  # 샐러리
확장라이브러리 라이브러리를 설치한다.[선택사항일까. 구현해보고 진행해보자.] pip install django-celery-results
settings.py 설정 메시지브로커 주소 등 설정값을 넣어준다. 브로커로 RabbitMQ를 사용한다면 다음과 같이.
CELERY_BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/myvhost'  # 브로커 주소값.
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
샐러리 설정은 위의 celery.py에서 설정한 네임스페이스로 시작한다.
INSTALLED_APPS = (  # 이 안에 확장 라이브러리를 등록한다.
    'django_celery_results',
    )

CELERY_BROKER_URL = 'redis://localhost:6379'  # 브로커 주소값. 여기선 redis.
CELERY_RESULT_BACKEND = 'django-db'  # 장고 DB를 사용하는 경우. DB백엔드로 redis를 사용한다면 위와 동일하게.
# django-celery-result 백엔드 설정.
CELERY_CAHCE_BACKEND = 'django-cache'
마이그레이션 확장 라이브러리 사용을 위해 DB테이블 생성. python manage.py migrate celery_results
task 등록

task 작성

일을 등록한다.

위에서 app.autodiscover_tasks()를 사용했다면 각 앱의 tasks.py 안에 넣어주면 작동한다.

(우측은 예시로, 자세한 작성은 아래 '뷰에서의 작업'을 참고하자)

데코레이터가 붙으면 이것이 task로 인식된다.

사용하고자 하는 앱 안에서 tasks.py로 등록한다.
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def 함수(변수):
    명령
    return 결과
app.autodiscover_tasks()를 사용하지 않았다면 아래와 같이 사용한다.
from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def 함수(변수):
    명령
    return 결과
@app.task(bind=True)

def debug_task(self): print('Request: {0!r}'.format(self.request))

bind=True 옵션을 사용하면 task 인스턴스를 참조해 자신의 request를 쉽게 가져올 수 있다는데... 찾아볼 필요가 있겠다.

bind=True 옵션을 쓰면 무조건 첫번째 인수는 self로 받아야 한다.

준비 확인

샐러리 실행

[데몬으로 시작하는 법은 따로 있나보다.]

다른 터미널 창을 열어 따로 실행해주어야 한다.

task.py의 내용이 바뀌면 재실행 해주어야 한다.(장고의 내용이 바뀌면 재실행하듯.)

celery -A settings.py가있는폴더.celery worker --loglevel=info

3 뷰에서의 작업

과정 설명 방법
모델 작성 링크의 코드를 참조하여 작성하였다.
from django.db import models

class 모델(models.Model):
    STATUS_PENDING = 'PENDING'
    STATUS_ERROR = 'ERROR'
    STATUS_SUCCESS = 'SUCCESS'
    STATUSES = (  # 상태를 기입하기 위한 사전작업.
        (STATUS_PENDING, 'Pending'),
        (STATUS_ERROR, 'Error'),
        (STATUS_SUCCESS, 'Success'),
    )
    input = models.IntegerField()
    output = models.IntegerField(blank=True, null=True)
    created_at = models.DateTimeField(auto_now_add=True)
    modified_at = models.DateTimeField(auto_now=True)
    status = models.CharField(max_length=8, choices=STATUSES)
    message = models.CharField(max_length=110, blank=True)
delay()는 celery의 업무를 수행하는 apply_async()의 간소판.

카운트다운이나 재실행 등의 자세한 옵션은 apply_async()에 더 자세한 변수를 넣음으로써 가능해진다.(좀 과하게 긴 작업을 제한한다든가.)

from django.shortcuts import render, redirect
from django.views import View

from .models import 모델  # 모델을 활용한다면.
from .tasks import 필요한일

class CalculationView(View):
    def get(self, request):
        """시작하는 템플릿."""
        return render(request, 'app/start.html')

    def post(self, request):
        """업무 수행."""
        n = request.POST['폼에서받을값']
        calculation = 모델.objects.create(status=Calculation.STATUS_PENDING, 기타 필요한 변수 기입)
        필요한일.delay(calculation.id)  # delay는 백그라운드에서 업무를 수행하라는 의미.

        return redirect('fibonacci_list')
task 작성
from celery_tutorial.celery import app  # celery를 등록한 경로에서.
from .models import Calculation

@app.task(bind=True)
def 필요한일(self, calculation_id):
    """Perform a calculation & update the status"""
    calculation = Calculation.objects.get(id=calculation_id)
    try:
        calculation.output = fib(calculation.input)  # 함수를 위에서 정의하여 업무 실행.
        calculation.status = Calculation.STATUS_SUCCESS  # 업무가 실행되면 수행할 작업.
    except Exception as e:
        calculation.status = Calculation.STATUS_ERROR  # 에러로 끝났음을 알린다.
        calculation.message = str(e)[:110]  # 에러메시지를 담는다.
    calculation.save()