воскресенье, 24 февраля 2013 г.

django + celery

1. Подключаем celery и kombu
2. Пишем обработчики для задач
3. Запускаем celeryd:
    >>> python ./manage.py celeryd -v 2 -B -s celery -E -l INFO
4. Стартуем задачи


Каркас

######## settings.py ########

INSTALLED_APPS += [
    'kombu.transport.django',
    'djcelery',
]

BROKER_URL = "django://"

import djcelery
djcelery.setup_loader()


######## tasks.py ########

from celery.task import task

@task
def your_task(*args, **kwargs):
    from time import sleep
    sleep(10)


######## view.py ########

from django.http import HttpResponseRedirect

from django.core.urlresolvers import reverse

from tasks import your_task


@staff_member_required
def start_task(request):
    your_task.apply_async(kwargs={
        'some_data': 'some_value'
    })
    return HttpResponseRedirect(reverse('index'))


Установка собственного статуса для задачи

from celery import current_task

current_task.update_state(
    state='PROGRESS',
        meta={
            'message': 'Выполняюсь'
        }
)


Сваливание логов в качестве статуса задачи

from logging import getLogger, Handler

class StateUpdaterHandler(Handler):
    def __init__(self, *args, **kwrags):
        super(StateUpdaterHandler, self).__init__(*args, **kwrags)

    def emit(self, record):
        if not current_task:
            return

        current_task.update_state(
            state='PROGRESS',
            meta={
                'message': record.getMessage()
            }
        )

@task
def your_task(**kwargs):
    logger = getLogger('loader')
    logger.addHandler(StateUpdaterHandler())
    logger.info('Работаю')


Задача-синглетон 


def single_instance_task(timeout):
    def task_exc(func):
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-task-" + func.__name__

            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)

            result = None

            if acquire_lock():
                try:
                    result = func(*args, **kwargs)
                finally:
                    release_lock()

            return result

        return wrapper

    return task_exc

@task
@single_instance_task(60 * 10)
def your_task(**kwargs):
    pass