Celery는 Django의 비동기 & 백그라운드 작업을 효율적으로 처리하는 도구 이다.
주요기능
- 웹 요청과 독립적으로 무거운 작업 처리 가능
- Django ORM과의 연계로 데이터베이스 처리 최적화
- celery-beat를 사용해 주기적 작업 관리 가능
- 분산 처리로 서버 부하 감소 및 성능 최적화
- 태스크 체인을 활용한 비동기 워크플로우 구현 가능
- Flower 같은 모니터링 도구를 활용해 작업 추적 가능
1. Celery 설치
pip install celery
2. Celery 설정 추가
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
3. Celery 파일 추가
import os
from celery import Celery
# Django의 기본 settings 모듈을 Celery 설정으로 사용
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celery_test.settings")
app = Celery("celery_test")
# 설정 파일을 Celery에 로드
app.config_from_object("django.conf:settings", namespace="CELERY")
# Django의 모든 등록된 앱에서 task 자동 검색
app.autodiscover_tasks()
4. init 등록
# __init__.py 등록
from .celery import app as celery_app
__all__ = ("celery_app",)
5. Celery 기능
- 기본적인 작업 큐(Task Queue)
- 주기적 작업(Periodic Tasks)
- 체인과 그룹(Chains & Groups)
- 비동기 결과 백엔드(Async Results & Backends)
- 리트라이(Retries) 및 예외 처리
- 비동기 워크플로우(Workflows)
- 모니터링(Flower, Prometheus 등)
import os
from celery import Celery
# Django settings 모듈 설정
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celery_test.settings")
app = Celery("celery_test")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# 기본 작업 큐(Task Queue)
from celery import shared_task
@shared_task
def add(x, y):
return x + y
# 주기적 작업(Periodic Tasks)
from celery.schedules import crontab
app.conf.beat_schedule = {
'say_hello_every_minute': {
'task': 'myapp.tasks.say_hello',
'schedule': crontab(minute='*/1'), # 1분마다 실행
},
}
@shared_task
def say_hello():
print("Hello, Celery!")
return "Hello, Celery!"
# 체인과 그룹(Chains & Groups)
from celery import chain, group
@shared_task
def multiply(x, y):
return x * y
@shared_task
def subtract(x, y):
return x - y
# 체인 예제
chain_result = chain(add.s(4, 6) | multiply.s(2) | subtract.s(5))()
# 그룹 예제
group_result = group(add.s(2, 3), multiply.s(4, 5), subtract.s(10, 3))()
# 비동기 결과 백엔드(Async Results & Backends)
from celery.result import AsyncResult
def get_task_result(task_id):
result = AsyncResult(task_id)
return result.status, result.result
# 리트라이(Retries) 및 예외 처리
from celery.exceptions import Retry
import random
@shared_task(bind=True, max_retries=3)
def unstable_task(self):
if random.choice([True, False]):
raise self.retry(exc=Exception("Temporary Failure"))
return "Task Succeeded"
# 비동기 워크플로우(Workflows)
@app.task(bind=True)
def complex_workflow(self, x, y):
task_chain = (add.s(x, y) | multiply.s(2) | subtract.s(5))()
return task_chain
# 모니터링(Flower 설정)
app.conf.update(
CELERY_FLOWER_URL="http://localhost:5555"
)
'Python > Django' 카테고리의 다른 글
[python] Django postman 사용 (0) | 2024.08.14 |
---|---|
VScode Windows Django 환경 세팅 (0) | 2024.08.01 |
[Django] DRF 로그인, 로그아웃, 회원가입 REST API 만들기 (0) | 2023.08.11 |
[Django Error] importError: django.conf.url 호출에러 (0) | 2023.08.07 |