Celery:Django
1 개요편집
장고 내에서 셀러리를 사용하기 위한 정보를 담은 문서. 완전 기초론 아무것도 배울 수 없기에 적절한 활용이 가능할 지식을 담아보았다.
1.1 용도편집
- 장고나 플라스크 등 웹서버에서 오래걸리는 작업을 수행하게 되면 timeout이 떠버려 결과를 받아볼 수가 없다. 그렇다고 웹서버의 timeout을 늘려버리면 비효율적인 자원낭비가 예상되어 함부로 늘릴 수도 없다. 그럴 때 사용하는 celery(샐러리).
- 스케쥴링 작업을 할 때에도 사용한다.
1.2 운용방식편집
보통 다음과 같이 구성된다.
분류 | 종류 |
---|---|
웹서버 | 장고... |
DB | SQLite, Mysql... |
Message queue
(메시지 브로커) |
Redis나 RabbitMQ |
메시지 브로커라는 외부 서버를 사용하여 작업을 수행한다. 이 메시지 브로커가 셀러리에 feed를 넘긴다.
- 장고가 파이썬 함수를 실행하면
- 셀러리가 해당 작업을 큐에 올린다.(메시지 브로커에 올린다.)
- 샐러리 워커가 일을 수행할 상황이 되면 큐를 가져와 수행한다.(워커별 독자적 처리)
- 워커의 처리결과는 DB 등에 저장하여 활용한다.
2 사전 준비편집
사람마다 사용하는 방식이 다르다.
다음 방식은 규모가 있는 프로젝트를 다룰 때 사용하기 적합한 방식.
과정 | 설명 | 방법 |
---|---|---|
사전설정 | 장고에서 celery_tutorial이라는 프로젝트가 만들어져 있다 가정하고 진행한다. | pip install celery |
라이브러리 인스턴스 만들기 | settings.py가 있는 디렉토리에 celery.py를 생성 우측과 같이 입력한다.
만약 app.autodiscover_tasks()를 사용하지 않는다면.. 인스턴스를 지정할 때 app = Celery('celery_tutorial', include=['앱이름.tasks']) 형태로 기입하면 포함된 tasks.py만 찾는다. |
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '프로젝트명.settings') # 설정파일 위치.
app = Celery('프로젝트명') # 보통 settings.py 파일이 있는 디렉토리 이름을 넣는다.
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') # 장고의 설정을 읽고, 네임스페이스를 설정한다.
# Load task modules from all registered Django app configs.
app.autodiscover_tasks() # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다. 맨 마지막의 tasks를 바꾸면 다른 이름으로 지정할 수도 있다.
# ex) app.autodiscover_othername()
|
장고가 시작될 때 자동실행 | 자동실행을 위해 settings.py가 있는 디렉토리에 __init__.py를 만든다.
위에서 등록한 샐러리 앱을 반영한다. |
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',) # 샐러리
|
확장라이브러리 | 라이브러리를 설치한다.
이거 설치 안하면 ModuleNotFoundError: No module named 'django-db' 에러가 뜬다. |
pip install django-celery-results |
settings.py 설정 | 메시지브로커 주소 등 설정값을 넣어준다. celery.py에서 설정한 namespace가 설정에서 활용된다.(맨 앞에 CELERY_로 시작하는 내용들이 샐러리 설정이 된다.)
CELERY_BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/myvhost' # 브로커 주소값.
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
|
샐러리 설정은 위의 celery.py에서 설정한 네임스페이스로 시작한다.INSTALLED_APPS = ( # 이 안에 확장 라이브러리를 등록한다.
'django_celery_results',
)
CELERY_BROKER_URL = 'redis://localhost:6379' # 브로커 주소값. 여기선 redis.
CELERY_RESULT_BACKEND = 'django-db' # 장고 DB를 사용하는 경우. DB백엔드로 redis를 사용한다면 위와 동일하게.
# django-celery-result 백엔드 설정.
CELERY_CAHCE_BACKEND = 'django-cache'
|
마이그레이션 | 확장 라이브러리 사용을 위해 DB테이블 생성. | python manage.py migrate celery_results |
task 등록
task 작성 |
일을 등록한다.
위에서 app.autodiscover_tasks()를 사용했다면 각 앱의 tasks.py 안에 넣어주면 작동한다. (우측은 예시로, 자세한 작성은 아래 '뷰에서의 작업'을 참고하자) 데코레이터가 붙으면 이것이 task로 인식된다. |
사용하고자 하는 앱 안에서 tasks.py로 등록한다.from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def 함수(변수):
명령
return 결과
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def 함수(변수):
명령
return 결과
def debug_task(self): print('Request: {0!r}'.format(self.request)) bind=True 옵션을 사용하면 task 인스턴스를 참조해 자신의 request를 쉽게 가져올 수 있다는데... 찾아볼 필요가 있겠다. bind=True 옵션을 쓰면 무조건 첫번째 인수는 self로 받아야 한다. |
준비 확인
샐러리 실행 |
다른 터미널 창을 열어 따로 실행해주어야 한다.(redis 등 메시지브로커가 실행된 상태여야 한다.)
뭔가 성공적인 메시지가 뜨면 준비 완료. tasks.py의 내용이 바뀌면 재실행 해주어야 한다.(장고의 내용이 바뀌면 재실행하듯.)
|
celery -A 프로젝트명 worker --loglevel=info |
2.1 운영체제별 실행편집
장고 서버 뿐 아니라 셀러리도 따로 실행해주어야 작업이 수행된다.
의도 | 설명 | 방법 | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
윈도우에서 | 윈도우에선 지원을 하지 않는다. 윈도우에선 self.run()은 띄워도 작업이 진행되지 않는다.
굳이 윈도우에서 쓰고 싶다면... 아래 라이브러리 설치 후 우측과 같이 쓰자. pip install gevent |
celery -A 프로젝트명 worker -l info -P gevent | ||||||||||||||
리눅스에서 | &를 붙여도 백그라운드 실행이 안되고 일반 실행과 동일하게 실행된다;;
보통 서비스 중에 프로세스가 죽는 등의 문제에 대응하기 위해 데몬화 하여 실행하는데, 여기에선 단순 백그라운드 실행을 안내한다.
|
|
3 DB조작 작업편집
과정 | 설명 | 방법 |
---|---|---|
모델 작성 | 링크의 코드를 참조하여 작성하였다. | from django.db import models
class 모델(models.Model):
STATUS_PENDING = 'PENDING'
STATUS_ERROR = 'ERROR'
STATUS_SUCCESS = 'SUCCESS'
STATUSES = ( # 상태를 기입하기 위한 사전작업.
(STATUS_PENDING, 'Pending'),
(STATUS_ERROR, 'Error'),
(STATUS_SUCCESS, 'Success'),
)
input = models.IntegerField()
output = models.IntegerField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
status = models.CharField(max_length=8, choices=STATUSES)
message = models.CharField(max_length=110, blank=True)
|
뷰 | delay()는 celery의 업무를 수행하는 apply_async()의 간소판.
delay() 따위가 없으면 그냥 함수와 같이 실행. 카운트다운이나 재실행 등의 자세한 옵션은 apply_async()에 더 자세한 변수를 넣음으로써 가능해진다.(좀 과하게 긴 작업을 제한한다든가.) |
from django.shortcuts import render, redirect
from django.views import View
from .models import 모델 # 모델을 활용한다면.
from .tasks import 필요한일
class CalculationView(View):
def get(self, request):
"""시작하는 템플릿."""
return render(request, 'app/start.html')
def post(self, request):
"""업무 수행."""
n = request.POST['폼에서받을값']
calculation = 모델.objects.create(status=Calculation.STATUS_PENDING, 기타 필요한 변수 기입)
필요한일.delay(calculation.id) # delay는 백그라운드에서 업무를 수행하라는 의미.
return redirect('fibonacci_list')
|
task 작성 | task를 작성할 땐 에러메시지를 곧바로 피드백받기 어려우니....
일단 작성 후 view에서 delay() 없이 실행하며 작동을 확인한 후 진행하자. |
from celery_tutorial.celery import app # celery를 등록한 경로에서.
from .models import Calculation
@app.task(bind=True)
def 필요한일(self, calculation_id):
"""Perform a calculation & update the status"""
calculation = Calculation.objects.get(id=calculation_id)
try:
calculation.output = fib(calculation.input) # 함수를 위에서 정의하여 업무 실행.
calculation.status = Calculation.STATUS_SUCCESS # 업무가 실행되면 수행할 작업.
except Exception as e:
calculation.status = Calculation.STATUS_ERROR # 에러로 끝났음을 알린다.
calculation.message = str(e)[:110] # 에러메시지를 담는다.
calculation.save()
|
4 이외 팁편집
보통 무언가를 구현할 때 중간과정마다 print함수로 에러지점을 찾아내는데, task에서 실행되는 작업은 다른 프로세스에서 실행되기에 print 내용이 나오지 않는다. 때문에 개발환경에선 delay가 아닌 그냥 함수로 실행하고 개발이 끝난 후 delay 기능을 넣는 편이 좋다.
5 관련 에러편집
5.1 Object of type User is not JSON serializable, Object of type HomeworkSubmit is not JSON serializable편집
일반 파이썬 함수에선 모델을 넣어주어도 제대로 작동하지만, celery로 넘기는 것은 다른 서버로 데이터를 넘기는 것으로, 일반적으로 통용되는 JSON 형태로 넘어간다. 하지만, 장고에서 사용하는 모델은 파이썬과 장고 내에서만 사용되는 것으로, JSON으로 직렬화 작업이 필요한데, 장고의 모델객체, request 따위는 너무 복잡해 직렬화가 되지 않는다. 즉, 모델을 직접 넘겨서 task에서 다룰 수 없다.
=> 직렬화 함수를 짜서 넘길 수도 있지만, 이 역시 상황에 따라 또 추가작업이 필요하기에 id를 넘겨 간접적으로 다루게 하는 게 이상적이다.