78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
import pickle
|
|
|
|
from huey.exceptions import QueueException
|
|
|
|
|
|
class TaskRegistry(object):
|
|
"""
|
|
A simple Registry used to track subclasses of :class:`QueueTask` - the
|
|
purpose of this registry is to allow translation from queue messages to
|
|
task classes, and vice-versa.
|
|
"""
|
|
_ignore = ['QueueTask', 'PeriodicQueueTask']
|
|
|
|
_registry = {}
|
|
_periodic_tasks = []
|
|
|
|
def task_to_string(self, task):
|
|
return '%s' % (task.__name__)
|
|
|
|
def register(self, task_class):
|
|
klass_str = self.task_to_string(task_class)
|
|
if klass_str in self._ignore:
|
|
return
|
|
|
|
if klass_str not in self._registry:
|
|
self._registry[klass_str] = task_class
|
|
|
|
# store an instance in a separate list of periodic tasks
|
|
if hasattr(task_class, 'validate_datetime'):
|
|
self._periodic_tasks.append(task_class())
|
|
|
|
def unregister(self, task_class):
|
|
klass_str = self.task_to_string(task_class)
|
|
|
|
if klass_str in self._registry:
|
|
del(self._registry[klass_str])
|
|
|
|
for task in self._periodic_tasks:
|
|
if isinstance(task, task_class):
|
|
self._periodic_tasks.remove(task)
|
|
|
|
def __contains__(self, klass_str):
|
|
return klass_str in self._registry
|
|
|
|
def get_message_for_task(self, task):
|
|
"""Convert a task object to a message for storage in the queue"""
|
|
return pickle.dumps((
|
|
task.task_id,
|
|
self.task_to_string(type(task)),
|
|
task.execute_time,
|
|
task.retries,
|
|
task.retry_delay,
|
|
task.get_data(),
|
|
))
|
|
|
|
def get_task_class(self, klass_str):
|
|
klass = self._registry.get(klass_str)
|
|
|
|
if not klass:
|
|
raise QueueException('%s not found in TaskRegistry' % klass_str)
|
|
|
|
return klass
|
|
|
|
def get_task_for_message(self, msg):
|
|
"""Convert a message from the queue into a task"""
|
|
# parse out the pieces from the enqueued message
|
|
raw = pickle.loads(msg)
|
|
task_id, klass_str, execute_time, retries, delay, data = raw
|
|
|
|
klass = self.get_task_class(klass_str)
|
|
return klass(data, task_id, execute_time, retries, delay)
|
|
|
|
def get_periodic_tasks(self):
|
|
return self._periodic_tasks
|
|
|
|
|
|
registry = TaskRegistry()
|