"Celery:Django"의 두 판 사이의 차이
71번째 줄: | 71번째 줄: | ||
# Load task modules from all registered Django app configs. | # Load task modules from all registered Django app configs. | ||
− | app.autodiscover_tasks() # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다. | + | app.autodiscover_tasks() # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다. 맨 마지막의 tasks를 바꾸면 다른 이름으로 지정할 수도 있다. |
+ | # ex) app.autodiscover_othername() | ||
</syntaxhighlight> | </syntaxhighlight> | ||
|- | |- | ||
152번째 줄: | 153번째 줄: | ||
뭔가 성공적인 메시지가 뜨면 준비 완료. | 뭔가 성공적인 메시지가 뜨면 준비 완료. | ||
− | + | tasks.py의 내용이 바뀌면 재실행 해주어야 한다.(장고의 내용이 바뀌면 재실행하듯.) | |
− | |celery -A 프로젝트명 | + | |
+ | |||
+ | 이후 task를 받으면 <code>[2023-01-12 19:11:47,323: INFO/SpawnPoolWorker-5] child process 4480 calling self.run()</code>과 같이 작업을 띄운다. | ||
+ | |celery -A 프로젝트명 worker --loglevel=info | ||
+ | |- | ||
+ | |윈도우에서. | ||
+ | |윈도우에선 지원을 하지 않는다. 윈도우에선 self.run()은 띄워도 작업이 진행되지 않는다. | ||
+ | 굳이 윈도우에서 쓰고 싶다면... 아래 라이브러리 설치 후 우측과 같이 쓰자. | ||
+ | |||
+ | pip install gevent | ||
+ | |celery -A 프로젝트명 worker -l info -P gevent | ||
|} | |} | ||
이후 운용에선 굳이 셀러리를 시작시켜주는 게 아니라 장고에서 자연스레 돌리는 듯하다.[확인필요] | 이후 운용에선 굳이 셀러리를 시작시켜주는 게 아니라 장고에서 자연스레 돌리는 듯하다.[확인필요] | ||
211번째 줄: | 222번째 줄: | ||
|- | |- | ||
|task 작성 | |task 작성 | ||
− | | | + | |task를 작성할 땐 에러메시지를 곧바로 피드백받기 어려우니.... |
+ | |||
+ | 일단 작성 후 view에서 delay() 없이 실행하며 작동을 확인한 후 진행하자. | ||
|<syntaxhighlight lang="python"> | |<syntaxhighlight lang="python"> | ||
from celery_tutorial.celery import app # celery를 등록한 경로에서. | from celery_tutorial.celery import app # celery를 등록한 경로에서. |
2023년 1월 12일 (목) 19:16 판
1 개요
장고 내에서 셀러리를 사용하기 위한 정보를 담은 문서. 완전 기초론 아무것도 배울 수 없기에 적절한 활용이 가능할 지식을 담아보았다.
1.1 용도
- 장고나 플라스크 등 웹서버에서 오래걸리는 작업을 수행하게 되면 timeout이 떠버려 결과를 받아볼 수가 없다. 그렇다고 웹서버의 timeout을 늘려버리면 비효율적인 자원낭비가 예상되어 함부로 늘릴 수도 없다. 그럴 때 사용하는 celery(샐러리).
- 스케쥴링 작업을 할 때에도 사용한다.
1.2 운용방식
보통 다음과 같이 구성된다.
분류 | 종류 |
---|---|
웹서버 | 장고... |
DB | SQLite, Mysql... |
Message queue
(메시지 브로커) |
Redis나 RabbitMQ |
메시지 브로커라는 외부 서버를 사용하여 작업을 수행한다. 이 메시지 브로커가 셀러리에 feed를 넘긴다.
- 장고가 파이썬 함수를 실행하면
- 셀러리가 해당 작업을 큐에 올린다.(메시지 브로커에 올린다.)
- 샐러리 워커가 일을 수행할 상황이 되면 큐를 가져와 수행한다.(워커별 독자적 처리)
- 워커의 처리결과는 DB 등에 저장하여 활용한다.
2 사전 준비
사람마다 사용하는 방식이 다르다.
다음 방식은 규모가 있는 프로젝트를 다룰 때 사용하기 적합한 방식.
과정 | 설명 | 방법 |
---|---|---|
사전설정 | 장고에서 celery_tutorial이라는 프로젝트가 만들어져 있다 가정하고 진행한다. | pip install celery |
라이브러리 인스턴스 만들기 | settings.py가 있는 디렉토리에 celery.py를 생성 우측과 같이 입력한다.
만약 app.autodiscover_tasks()를 사용하지 않는다면.. 인스턴스를 지정할 때 app = Celery('celery_tutorial', include=['앱이름.tasks']) 형태로 기입하면 포함된 tasks.py만 찾는다. |
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '프로젝트명.settings') # 설정파일 위치.
app = Celery('프로젝트명') # 보통 settings.py 파일이 있는 디렉토리 이름을 넣는다.
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') # 장고의 설정을 읽고, 네임스페이스를 설정한다.
# Load task modules from all registered Django app configs.
app.autodiscover_tasks() # 선택사항. 자동으로 하위 앱의 tasks.py를 탐색한다. 맨 마지막의 tasks를 바꾸면 다른 이름으로 지정할 수도 있다.
# ex) app.autodiscover_othername()
|
장고가 시작될 때 자동실행 | 자동실행을 위해 settings.py가 있는 디렉토리에 __init__.py를 만든다.
위에서 등록한 샐러리 앱을 반영한다. |
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',) # 샐러리
|
확장라이브러리 | 라이브러리를 설치한다.
이거 설치 안하면 ModuleNotFoundError: No module named 'django-db' 에러가 뜬다. |
pip install django-celery-results |
settings.py 설정 | 메시지브로커 주소 등 설정값을 넣어준다. celery.py에서 설정한 namespace가 설정에서 활용된다.(맨 앞에 CELERY_로 시작하는 내용들이 샐러리 설정이 된다.)
CELERY_BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/myvhost' # 브로커 주소값.
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
|
샐러리 설정은 위의 celery.py에서 설정한 네임스페이스로 시작한다.INSTALLED_APPS = ( # 이 안에 확장 라이브러리를 등록한다.
'django_celery_results',
)
CELERY_BROKER_URL = 'redis://localhost:6379' # 브로커 주소값. 여기선 redis.
CELERY_RESULT_BACKEND = 'django-db' # 장고 DB를 사용하는 경우. DB백엔드로 redis를 사용한다면 위와 동일하게.
# django-celery-result 백엔드 설정.
CELERY_CAHCE_BACKEND = 'django-cache'
|
마이그레이션 | 확장 라이브러리 사용을 위해 DB테이블 생성. | python manage.py migrate celery_results |
task 등록
task 작성 |
일을 등록한다.
위에서 app.autodiscover_tasks()를 사용했다면 각 앱의 tasks.py 안에 넣어주면 작동한다. (우측은 예시로, 자세한 작성은 아래 '뷰에서의 작업'을 참고하자) 데코레이터가 붙으면 이것이 task로 인식된다. |
사용하고자 하는 앱 안에서 tasks.py로 등록한다.from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def 함수(변수):
명령
return 결과
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def 함수(변수):
명령
return 결과
def debug_task(self): print('Request: {0!r}'.format(self.request)) bind=True 옵션을 사용하면 task 인스턴스를 참조해 자신의 request를 쉽게 가져올 수 있다는데... 찾아볼 필요가 있겠다. bind=True 옵션을 쓰면 무조건 첫번째 인수는 self로 받아야 한다. |
준비 확인
샐러리 실행 |
다른 터미널 창을 열어 따로 실행해주어야 한다.(redis 등 메시지브로커가 실행된 상태여야 한다.)
뭔가 성공적인 메시지가 뜨면 준비 완료. tasks.py의 내용이 바뀌면 재실행 해주어야 한다.(장고의 내용이 바뀌면 재실행하듯.)
|
celery -A 프로젝트명 worker --loglevel=info |
윈도우에서. | 윈도우에선 지원을 하지 않는다. 윈도우에선 self.run()은 띄워도 작업이 진행되지 않는다.
굳이 윈도우에서 쓰고 싶다면... 아래 라이브러리 설치 후 우측과 같이 쓰자. pip install gevent |
celery -A 프로젝트명 worker -l info -P gevent |
이후 운용에선 굳이 셀러리를 시작시켜주는 게 아니라 장고에서 자연스레 돌리는 듯하다.[확인필요]
3 DB조작 작업
과정 | 설명 | 방법 |
---|---|---|
모델 작성 | 링크의 코드를 참조하여 작성하였다. | from django.db import models
class 모델(models.Model):
STATUS_PENDING = 'PENDING'
STATUS_ERROR = 'ERROR'
STATUS_SUCCESS = 'SUCCESS'
STATUSES = ( # 상태를 기입하기 위한 사전작업.
(STATUS_PENDING, 'Pending'),
(STATUS_ERROR, 'Error'),
(STATUS_SUCCESS, 'Success'),
)
input = models.IntegerField()
output = models.IntegerField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
status = models.CharField(max_length=8, choices=STATUSES)
message = models.CharField(max_length=110, blank=True)
|
뷰 | delay()는 celery의 업무를 수행하는 apply_async()의 간소판.
delay() 따위가 없으면 그냥 함수와 같이 실행. 카운트다운이나 재실행 등의 자세한 옵션은 apply_async()에 더 자세한 변수를 넣음으로써 가능해진다.(좀 과하게 긴 작업을 제한한다든가.) |
from django.shortcuts import render, redirect
from django.views import View
from .models import 모델 # 모델을 활용한다면.
from .tasks import 필요한일
class CalculationView(View):
def get(self, request):
"""시작하는 템플릿."""
return render(request, 'app/start.html')
def post(self, request):
"""업무 수행."""
n = request.POST['폼에서받을값']
calculation = 모델.objects.create(status=Calculation.STATUS_PENDING, 기타 필요한 변수 기입)
필요한일.delay(calculation.id) # delay는 백그라운드에서 업무를 수행하라는 의미.
return redirect('fibonacci_list')
|
task 작성 | task를 작성할 땐 에러메시지를 곧바로 피드백받기 어려우니....
일단 작성 후 view에서 delay() 없이 실행하며 작동을 확인한 후 진행하자. |
from celery_tutorial.celery import app # celery를 등록한 경로에서.
from .models import Calculation
@app.task(bind=True)
def 필요한일(self, calculation_id):
"""Perform a calculation & update the status"""
calculation = Calculation.objects.get(id=calculation_id)
try:
calculation.output = fib(calculation.input) # 함수를 위에서 정의하여 업무 실행.
calculation.status = Calculation.STATUS_SUCCESS # 업무가 실행되면 수행할 작업.
except Exception as e:
calculation.status = Calculation.STATUS_ERROR # 에러로 끝났음을 알린다.
calculation.message = str(e)[:110] # 에러메시지를 담는다.
calculation.save()
|