1. Подключаем celery и kombu
2. Пишем обработчики для задач
3. Запускаем celeryd:
>>> python ./manage.py celeryd -v 2 -B -s celery -E -l INFO
4. Стартуем задачи
INSTALLED_APPS += [
'kombu.transport.django',
'djcelery',
]
BROKER_URL = "django://"
import djcelery
djcelery.setup_loader()
######## tasks.py ########
from celery.task import task
@task
def your_task(*args, **kwargs):
from time import sleep
sleep(10)
######## view.py ########
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from tasks import your_task
@staff_member_required
def start_task(request):
your_task.apply_async(kwargs={
'some_data': 'some_value'
})
return HttpResponseRedirect(reverse('index'))
2. Пишем обработчики для задач
3. Запускаем celeryd:
>>> python ./manage.py celeryd -v 2 -B -s celery -E -l INFO
4. Стартуем задачи
Каркас
######## settings.py ########INSTALLED_APPS += [
'kombu.transport.django',
'djcelery',
]
BROKER_URL = "django://"
import djcelery
djcelery.setup_loader()
######## tasks.py ########
from celery.task import task
@task
def your_task(*args, **kwargs):
from time import sleep
sleep(10)
######## view.py ########
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from tasks import your_task
@staff_member_required
def start_task(request):
your_task.apply_async(kwargs={
'some_data': 'some_value'
})
return HttpResponseRedirect(reverse('index'))
Установка собственного статуса для задачи
from celery import current_task
current_task.update_state(
state='PROGRESS',
meta={
'message': 'Выполняюсь'
}
)
Сваливание логов в качестве статуса задачи
from logging import getLogger, Handler
class StateUpdaterHandler(Handler):
def __init__(self, *args, **kwrags):
super(StateUpdaterHandler, self).__init__(*args, **kwrags)
def emit(self, record):
if not current_task:
return
current_task.update_state(
state='PROGRESS',
meta={
'message': record.getMessage()
}
)
@task
def your_task(**kwargs):
logger = getLogger('loader')
logger.addHandler(StateUpdaterHandler())
logger.info('Работаю')
Задача-синглетон
def single_instance_task(timeout):
def task_exc(func):
def wrapper(*args, **kwargs):
lock_id = "celery-single-task-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
result = None
if acquire_lock():
try:
result = func(*args, **kwargs)
finally:
release_lock()
return result
return wrapper
return task_exc
@task
@single_instance_task(60 * 10)
@single_instance_task(60 * 10)
def your_task(**kwargs):
pass