import datetime import json import pickle import re import time import traceback import uuid from functools import wraps from huey.backends.dummy import DummySchedule from huey.exceptions import DataStoreGetException from huey.exceptions import DataStorePutException from huey.exceptions import DataStoreTimeout from huey.exceptions import QueueException from huey.exceptions import QueueReadException from huey.exceptions import QueueRemoveException from huey.exceptions import QueueWriteException from huey.exceptions import ScheduleAddException from huey.exceptions import ScheduleReadException from huey.registry import registry from huey.utils import EmptyData from huey.utils import local_to_utc from huey.utils import wrap_exception class Huey(object): """ Huey executes tasks by exposing function decorators that cause the function call to be enqueued for execution by the consumer. Typically your application will only need one Huey instance, but you can have as many as you like -- the only caveat is that one consumer process must be executed for each Huey instance. :param queue: a queue instance, e.g. ``RedisQueue()`` :param result_store: a place to store results, e.g. ``RedisResultStore()`` :param schedule: a place to store pending tasks, e.g. ``RedisSchedule()`` :param events: channel to send events on, e.g. ``RedisEventEmitter()`` :param store_none: Flag to indicate whether tasks that return ``None`` should store their results in the result store. :param always_eager: Useful for testing, this will execute all tasks immediately, without enqueueing them. Example usage:: from huey.api import Huey, crontab from huey.backends.redis_backend import RedisQueue, RedisDataStore, RedisSchedule queue = RedisQueue('my-app') result_store = RedisDataStore('my-app') schedule = RedisSchedule('my-app') huey = Huey(queue, result_store, schedule) # This is equivalent to the previous 4 lines: # huey = RedisHuey('my-app', {'host': 'localhost', 'port': 6379}) @huey.task() def slow_function(some_arg): # ... do something ... return some_arg @huey.periodic_task(crontab(minute='0', hour='3')) def backup(): # do a backup every day at 3am return """ def __init__(self, queue, result_store=None, schedule=None, events=None, store_none=False, always_eager=False): self.queue = queue self.result_store = result_store self.schedule = schedule or DummySchedule(self.queue.name) self.events = events self.blocking = self.queue.blocking self.store_none = store_none self.always_eager = always_eager def task(self, retries=0, retry_delay=0, retries_as_argument=False, include_task=False, name=None): def decorator(func): """ Decorator to execute a function out-of-band via the consumer. """ klass = create_task( QueueTask, func, retries_as_argument, name, include_task) def schedule(args=None, kwargs=None, eta=None, delay=None, convert_utc=True, task_id=None): if delay and eta: raise ValueError('Both a delay and an eta cannot be ' 'specified at the same time') if delay: eta = (datetime.datetime.now() + datetime.timedelta(seconds=delay)) if convert_utc and eta: eta = local_to_utc(eta) cmd = klass( (args or (), kwargs or {}), execute_time=eta, retries=retries, retry_delay=retry_delay, task_id=task_id) return self.enqueue(cmd) func.schedule = schedule func.task_class = klass @wraps(func) def inner_run(*args, **kwargs): cmd = klass( (args, kwargs), retries=retries, retry_delay=retry_delay) return self.enqueue(cmd) inner_run.call_local = func return inner_run return decorator def periodic_task(self, validate_datetime, name=None): """ Decorator to execute a function on a specific schedule. """ def decorator(func): def method_validate(self, dt): return validate_datetime(dt) klass = create_task( PeriodicQueueTask, func, task_name=name, validate_datetime=method_validate, ) func.task_class = klass def _revoke(revoke_until=None, revoke_once=False): self.revoke(klass(), revoke_until, revoke_once) func.revoke = _revoke def _is_revoked(dt=None, peek=True): return self.is_revoked(klass(), dt, peek) func.is_revoked = _is_revoked def _restore(): return self.restore(klass()) func.restore = _restore return func return decorator def _wrapped_operation(exc_class): def decorator(fn): def inner(*args, **kwargs): try: return fn(*args, **kwargs) except: wrap_exception(exc_class) return inner return decorator @_wrapped_operation(QueueWriteException) def _write(self, msg): self.queue.write(msg) @_wrapped_operation(QueueReadException) def _read(self): return self.queue.read() @_wrapped_operation(QueueRemoveException) def _remove(self, msg): return self.queue.remove(msg) @_wrapped_operation(DataStoreGetException) def _get(self, key, peek=False): if peek: return self.result_store.peek(key) else: return self.result_store.get(key) @_wrapped_operation(DataStorePutException) def _put(self, key, value): return self.result_store.put(key, value) @_wrapped_operation(ScheduleAddException) def _add_schedule(self, data, ts): if self.schedule is None: raise AttributeError('Schedule not specified.') self.schedule.add(data, ts) @_wrapped_operation(ScheduleReadException) def _read_schedule(self, ts): if self.schedule is None: raise AttributeError('Schedule not specified.') return self.schedule.read(ts) def emit(self, message): """Events should always fail silently.""" try: self.events.emit(message) except: pass def enqueue(self, task): if self.always_eager: return task.execute() self._write(registry.get_message_for_task(task)) if self.result_store: return AsyncData(self, task) def dequeue(self): message = self._read() if message: return registry.get_task_for_message(message) def _format_time(self, dt): if dt is None: return None return time.mktime(dt.timetuple()) def emit_task(self, status, task, error=False): if self.events: message_data = {'status': status} message_data.update({ 'id': task.task_id, 'task': type(task).__name__, 'retries': task.retries, 'retry_delay': task.retry_delay, 'execute_time': self._format_time(task.execute_time), 'error': error}) if error: message_data['traceback'] = traceback.format_exc() self.emit(json.dumps(message_data)) def execute(self, task): if not isinstance(task, QueueTask): raise TypeError('Unknown object: %s' % task) result = task.execute() if result is None and not self.store_none: return if self.result_store and not isinstance(task, PeriodicQueueTask): self._put(task.task_id, pickle.dumps(result)) return result def revoke(self, task, revoke_until=None, revoke_once=False): if not self.result_store: raise QueueException('A DataStore is required to revoke task') serialized = pickle.dumps((revoke_until, revoke_once)) self._put(task.revoke_id, serialized) def restore(self, task): self._get(task.revoke_id) # simply get and delete if there def is_revoked(self, task, dt=None, peek=True): if not self.result_store: return False res = self._get(task.revoke_id, peek=True) if res is EmptyData: return False revoke_until, revoke_once = pickle.loads(res) if revoke_once: # This task *was* revoked for one run, but now it should be # restored to normal execution. if not peek: self.restore(task) return True return revoke_until is None or revoke_until > dt def add_schedule(self, task): msg = registry.get_message_for_task(task) ex_time = task.execute_time or datetime.datetime.fromtimestamp(0) self._add_schedule(msg, ex_time) def read_schedule(self, ts): return [ registry.get_task_for_message(m) for m in self._read_schedule(ts)] def flush(self): self.queue.flush() def ready_to_run(self, cmd, dt=None): dt = dt or datetime.datetime.utcnow() return cmd.execute_time is None or cmd.execute_time <= dt class AsyncData(object): def __init__(self, huey, task): self.huey = huey self.task = task self._result = EmptyData def _get(self): task_id = self.task.task_id if self._result is EmptyData: res = self.huey._get(task_id) if res is not EmptyData: self._result = pickle.loads(res) return self._result else: return res else: return self._result def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0, revoke_on_timeout=False): if not blocking: res = self._get() if res is not EmptyData: return res else: start = time.time() delay = .1 while self._result is EmptyData: if timeout and time.time() - start >= timeout: if revoke_on_timeout: self.revoke() raise DataStoreTimeout if delay > max_delay: delay = max_delay if self._get() is EmptyData: time.sleep(delay) delay *= backoff return self._result def revoke(self): self.huey.revoke(self.task) def restore(self): self.huey.restore(self.task) def with_metaclass(meta, base=object): return meta("NewBase", (base,), {}) class QueueTaskMetaClass(type): def __init__(cls, name, bases, attrs): """ Metaclass to ensure that all task classes are registered """ registry.register(cls) class QueueTask(with_metaclass(QueueTaskMetaClass)): """ A class that encapsulates the logic necessary to 'do something' given some arbitrary data. When enqueued with the :class:`Huey`, it will be stored in a queue for out-of-band execution via the consumer. See also the :meth:`task` decorator, which can be used to automatically execute any function out-of-band. Example:: class SendEmailTask(QueueTask): def execute(self): data = self.get_data() send_email(data['recipient'], data['subject'], data['body']) huey.enqueue( SendEmailTask({ 'recipient': 'somebody@spam.com', 'subject': 'look at this awesome website', 'body': 'http://youtube.com' }) ) """ def __init__(self, data=None, task_id=None, execute_time=None, retries=0, retry_delay=0): self.set_data(data) self.task_id = task_id or self.create_id() self.revoke_id = 'r:%s' % self.task_id self.execute_time = execute_time self.retries = retries self.retry_delay = retry_delay def create_id(self): return str(uuid.uuid4()) def get_data(self): return self.data def set_data(self, data): self.data = data def execute(self): """Execute any arbitary code here""" raise NotImplementedError def __eq__(self, rhs): return ( self.task_id == rhs.task_id and self.execute_time == rhs.execute_time and type(self) == type(rhs)) class PeriodicQueueTask(QueueTask): def create_id(self): return registry.task_to_string(type(self)) def validate_datetime(self, dt): """Validate that the task should execute at the given datetime""" return False def create_task(task_class, func, retries_as_argument=False, task_name=None, include_task=False, **kwargs): def execute(self): args, kwargs = self.data or ((), {}) if retries_as_argument: kwargs['retries'] = self.retries if include_task: kwargs['task'] = self return func(*args, **kwargs) attrs = { 'execute': execute, '__module__': func.__module__, '__doc__': func.__doc__ } attrs.update(kwargs) klass = type( task_name or 'queuecmd_%s' % (func.__name__), (task_class,), attrs ) return klass dash_re = re.compile('(\d+)-(\d+)') every_re = re.compile('\*\/(\d+)') def crontab(month='*', day='*', day_of_week='*', hour='*', minute='*'): """ Convert a "crontab"-style set of parameters into a test function that will return True when the given datetime matches the parameters set forth in the crontab. Acceptable inputs: * = every distinct value */n = run every "n" times, i.e. hours='*/4' == 0, 4, 8, 12, 16, 20 m-n = run every time m..n m,n = run on m and n """ validation = ( ('m', month, range(1, 13)), ('d', day, range(1, 32)), ('w', day_of_week, range(7)), ('H', hour, range(24)), ('M', minute, range(60)) ) cron_settings = [] for (date_str, value, acceptable) in validation: settings = set([]) if isinstance(value, int): value = str(value) for piece in value.split(','): if piece == '*': settings.update(acceptable) continue if piece.isdigit(): piece = int(piece) if piece not in acceptable: raise ValueError('%d is not a valid input' % piece) settings.add(piece) else: dash_match = dash_re.match(piece) if dash_match: lhs, rhs = map(int, dash_match.groups()) if lhs not in acceptable or rhs not in acceptable: raise ValueError('%s is not a valid input' % piece) settings.update(range(lhs, rhs+1)) continue every_match = every_re.match(piece) if every_match: interval = int(every_match.groups()[0]) settings.update(acceptable[::interval]) cron_settings.append(sorted(list(settings))) def validate_date(dt): _, m, d, H, M, _, w, _, _ = dt.timetuple() # fix the weekday to be sunday=0 w = (w + 1) % 7 for (date_piece, selection) in zip([m, d, w, H, M], cron_settings): if date_piece not in selection: return False return True return validate_date