From 783e7e6d0ddc37e51fbe2b1eb11d4392418e4ce9 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Mar 2017 18:01:35 +0200 Subject: [PATCH] added support for rpm packaging and basic support for deb --- deb/control | 10 + deb/createdeb.sh | 34 ++ lib/huey/__init__.py | 62 +++ lib/huey/api.py | 513 ++++++++++++++++++ lib/huey/backends/__init__.py | 0 lib/huey/backends/base.py | 113 ++++ lib/huey/backends/dummy.py | 103 ++++ lib/huey/backends/rabbitmq_backend.py | 153 ++++++ lib/huey/backends/redis_backend.py | 153 ++++++ lib/huey/backends/sqlite_backend.py | 205 +++++++ lib/huey/bin/__init__.py | 0 lib/huey/bin/huey_consumer.py | 121 +++++ lib/huey/consumer.py | 279 ++++++++++ lib/huey/djhuey/__init__.py | 119 ++++ lib/huey/djhuey/management/__init__.py | 0 .../djhuey/management/commands/__init__.py | 0 .../djhuey/management/commands/run_huey.py | 126 +++++ lib/huey/djhuey/models.py | 0 lib/huey/exceptions.py | 26 + lib/huey/peewee_helpers/__init__.py | 20 + lib/huey/registry.py | 77 +++ lib/huey/tests/__init__.py | 10 + lib/huey/tests/backends.py | 170 ++++++ lib/huey/tests/consumer.py | 441 +++++++++++++++ lib/huey/tests/crontab.py | 91 ++++ lib/huey/tests/peewee_tests.py | 62 +++ lib/huey/tests/queue.py | 438 +++++++++++++++ lib/huey/tests/utils.py | 24 + lib/huey/utils.py | 21 + rpm/build.sh | 12 + rpm/tis-tisbackup.spec | 54 ++ scripts/tisbackup_gui.service | 9 + scripts/tisbackup_huey.service | 9 + 33 files changed, 3455 insertions(+) create mode 100644 deb/control create mode 100755 deb/createdeb.sh create mode 100644 lib/huey/__init__.py create mode 100644 lib/huey/api.py create mode 100644 lib/huey/backends/__init__.py create mode 100644 lib/huey/backends/base.py create mode 100644 lib/huey/backends/dummy.py create mode 100644 lib/huey/backends/rabbitmq_backend.py create mode 100644 lib/huey/backends/redis_backend.py create mode 100644 lib/huey/backends/sqlite_backend.py create mode 100644 lib/huey/bin/__init__.py create mode 100755 lib/huey/bin/huey_consumer.py create mode 100644 lib/huey/consumer.py create mode 100644 lib/huey/djhuey/__init__.py create mode 100644 lib/huey/djhuey/management/__init__.py create mode 100644 lib/huey/djhuey/management/commands/__init__.py create mode 100644 lib/huey/djhuey/management/commands/run_huey.py create mode 100644 lib/huey/djhuey/models.py create mode 100644 lib/huey/exceptions.py create mode 100644 lib/huey/peewee_helpers/__init__.py create mode 100644 lib/huey/registry.py create mode 100644 lib/huey/tests/__init__.py create mode 100644 lib/huey/tests/backends.py create mode 100644 lib/huey/tests/consumer.py create mode 100644 lib/huey/tests/crontab.py create mode 100644 lib/huey/tests/peewee_tests.py create mode 100644 lib/huey/tests/queue.py create mode 100644 lib/huey/tests/utils.py create mode 100644 lib/huey/utils.py create mode 100755 rpm/build.sh create mode 100644 rpm/tis-tisbackup.spec create mode 100644 scripts/tisbackup_gui.service create mode 100644 scripts/tisbackup_huey.service diff --git a/deb/control b/deb/control new file mode 100644 index 0000000..1bd0be9 --- /dev/null +++ b/deb/control @@ -0,0 +1,10 @@ +Package: tis-tisbackup +Version: VERSION +Section: base +Priority: optional +Architecture: all +Depends: unzip ssh rsync python-paramiko python-pyvmomi python-pexpect +Maintainer: Tranquil-IT-Systems +Description: TISBackup backup management +Homepage: http://www.tranquil-it-systems.fr + diff --git a/deb/createdeb.sh b/deb/createdeb.sh new file mode 100755 index 0000000..70d9883 --- /dev/null +++ b/deb/createdeb.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +#svn --username svnuser up +#VERSION=$(svn info |awk '/Revi/{print $2}') +VERSION=0.1 +VERSION=$VERSION-$(git rev-parse --short HEAD) +rm -f *.deb +rm -Rf builddir +mkdir builddir +mkdir builddir/DEBIAN +cp ./control ./builddir/DEBIAN +#cp ./files/postinst ./builddir/DEBIAN +#cp ./files/prerm ./builddir/DEBIAN + +sed "s/VERSION/$VERSION/" -i ./builddir/DEBIAN/control + +mkdir -p builddir/opt/tisbackup/ +mkdir -p ./builddir/usr/lib/systemd/system/ + +#cp ../scripts/tisbackup_gui.service ./builddir/usr/lib/systemd/system/ +rsync -aP --exclude=deb ../ ./builddir/opt/tisbackup + +#tis-arpwatch +#chmod 755 ./builddir/opt/tis-nagios/*.py +#chmod 755 ./builddir/etc/init.d/tis-arpwatch + + +dpkg-deb --build builddir tis-tisbackup-${VERSION}.deb + +#echo "== Copie du .deb sur le serveur tisdeb ==" +#scp *.deb root@srvinstallation:/var/www/srvinstallation/tisdeb/binary + +#echo "== Scan du répertoire ==" +#ssh root@srvinstallation /var/www/srvinstallation/tisdeb/updateRepo.sh + diff --git a/lib/huey/__init__.py b/lib/huey/__init__.py new file mode 100644 index 0000000..5a4db0a --- /dev/null +++ b/lib/huey/__init__.py @@ -0,0 +1,62 @@ +__author__ = 'Charles Leifer' +__license__ = 'MIT' +__version__ = '0.4.9' + +from huey.api import Huey, crontab + +try: + import redis + from huey.backends.redis_backend import RedisBlockingQueue + from huey.backends.redis_backend import RedisDataStore + from huey.backends.redis_backend import RedisEventEmitter + from huey.backends.redis_backend import RedisSchedule + + class RedisHuey(Huey): + def __init__(self, name='huey', store_none=False, always_eager=False, + read_timeout=None, **conn_kwargs): + queue = RedisBlockingQueue( + name, + read_timeout=read_timeout, + **conn_kwargs) + result_store = RedisDataStore(name, **conn_kwargs) + schedule = RedisSchedule(name, **conn_kwargs) + events = RedisEventEmitter(name, **conn_kwargs) + super(RedisHuey, self).__init__( + queue=queue, + result_store=result_store, + schedule=schedule, + events=events, + store_none=store_none, + always_eager=always_eager) + +except ImportError: + class RedisHuey(object): + def __init__(self, *args, **kwargs): + raise RuntimeError('Error, "redis" is not installed. Install ' + 'using pip: "pip install redis"') + +try: + from huey.backends.sqlite_backend import SqliteQueue + from huey.backends.sqlite_backend import SqliteDataStore + from huey.backends.sqlite_backend import SqliteSchedule + + class SqliteHuey(Huey): + def __init__(self, name='huey', store_none=False, always_eager=False, + location=None): + if location is None: + raise ValueError("Please specify a database file with the " + "'location' parameter") + queue = SqliteQueue(name, location) + result_store = SqliteDataStore(name, location) + schedule = SqliteSchedule(name, location) + super(SqliteHuey, self).__init__( + queue=queue, + result_store=result_store, + schedule=schedule, + events=None, + store_none=store_none, + always_eager=always_eager) +except ImportError: + class SqliteHuey(object): + def __init__(self, *args, **kwargs): + raise RuntimeError('Error, "sqlite" is not installed.') diff --git a/lib/huey/api.py b/lib/huey/api.py new file mode 100644 index 0000000..3cc9490 --- /dev/null +++ b/lib/huey/api.py @@ -0,0 +1,513 @@ +import datetime +import json +import pickle +import re +import time +import traceback +import uuid +from functools import wraps + +from huey.backends.dummy import DummySchedule +from huey.exceptions import DataStoreGetException +from huey.exceptions import DataStorePutException +from huey.exceptions import DataStoreTimeout +from huey.exceptions import QueueException +from huey.exceptions import QueueReadException +from huey.exceptions import QueueRemoveException +from huey.exceptions import QueueWriteException +from huey.exceptions import ScheduleAddException +from huey.exceptions import ScheduleReadException +from huey.registry import registry +from huey.utils import EmptyData +from huey.utils import local_to_utc +from huey.utils import wrap_exception + + +class Huey(object): + """ + Huey executes tasks by exposing function decorators that cause the function + call to be enqueued for execution by the consumer. + + Typically your application will only need one Huey instance, but you can + have as many as you like -- the only caveat is that one consumer process + must be executed for each Huey instance. + + :param queue: a queue instance, e.g. ``RedisQueue()`` + :param result_store: a place to store results, e.g. ``RedisResultStore()`` + :param schedule: a place to store pending tasks, e.g. ``RedisSchedule()`` + :param events: channel to send events on, e.g. ``RedisEventEmitter()`` + :param store_none: Flag to indicate whether tasks that return ``None`` + should store their results in the result store. + :param always_eager: Useful for testing, this will execute all tasks + immediately, without enqueueing them. + + Example usage:: + + from huey.api import Huey, crontab + from huey.backends.redis_backend import RedisQueue, RedisDataStore, RedisSchedule + + queue = RedisQueue('my-app') + result_store = RedisDataStore('my-app') + schedule = RedisSchedule('my-app') + huey = Huey(queue, result_store, schedule) + + # This is equivalent to the previous 4 lines: + # huey = RedisHuey('my-app', {'host': 'localhost', 'port': 6379}) + + @huey.task() + def slow_function(some_arg): + # ... do something ... + return some_arg + + @huey.periodic_task(crontab(minute='0', hour='3')) + def backup(): + # do a backup every day at 3am + return + """ + def __init__(self, queue, result_store=None, schedule=None, events=None, + store_none=False, always_eager=False): + self.queue = queue + self.result_store = result_store + self.schedule = schedule or DummySchedule(self.queue.name) + self.events = events + self.blocking = self.queue.blocking + self.store_none = store_none + self.always_eager = always_eager + + def task(self, retries=0, retry_delay=0, retries_as_argument=False, + include_task=False, name=None): + def decorator(func): + """ + Decorator to execute a function out-of-band via the consumer. + """ + klass = create_task( + QueueTask, + func, + retries_as_argument, + name, + include_task) + + def schedule(args=None, kwargs=None, eta=None, delay=None, + convert_utc=True, task_id=None): + if delay and eta: + raise ValueError('Both a delay and an eta cannot be ' + 'specified at the same time') + if delay: + eta = (datetime.datetime.now() + + datetime.timedelta(seconds=delay)) + if convert_utc and eta: + eta = local_to_utc(eta) + cmd = klass( + (args or (), kwargs or {}), + execute_time=eta, + retries=retries, + retry_delay=retry_delay, + task_id=task_id) + return self.enqueue(cmd) + + func.schedule = schedule + func.task_class = klass + + @wraps(func) + def inner_run(*args, **kwargs): + cmd = klass( + (args, kwargs), + retries=retries, + retry_delay=retry_delay) + return self.enqueue(cmd) + + inner_run.call_local = func + return inner_run + return decorator + + def periodic_task(self, validate_datetime, name=None): + """ + Decorator to execute a function on a specific schedule. + """ + def decorator(func): + def method_validate(self, dt): + return validate_datetime(dt) + + klass = create_task( + PeriodicQueueTask, + func, + task_name=name, + validate_datetime=method_validate, + ) + + func.task_class = klass + + def _revoke(revoke_until=None, revoke_once=False): + self.revoke(klass(), revoke_until, revoke_once) + func.revoke = _revoke + + def _is_revoked(dt=None, peek=True): + return self.is_revoked(klass(), dt, peek) + func.is_revoked = _is_revoked + + def _restore(): + return self.restore(klass()) + func.restore = _restore + + return func + return decorator + + def _wrapped_operation(exc_class): + def decorator(fn): + def inner(*args, **kwargs): + try: + return fn(*args, **kwargs) + except: + wrap_exception(exc_class) + return inner + return decorator + + @_wrapped_operation(QueueWriteException) + def _write(self, msg): + self.queue.write(msg) + + @_wrapped_operation(QueueReadException) + def _read(self): + return self.queue.read() + + @_wrapped_operation(QueueRemoveException) + def _remove(self, msg): + return self.queue.remove(msg) + + @_wrapped_operation(DataStoreGetException) + def _get(self, key, peek=False): + if peek: + return self.result_store.peek(key) + else: + return self.result_store.get(key) + + @_wrapped_operation(DataStorePutException) + def _put(self, key, value): + return self.result_store.put(key, value) + + @_wrapped_operation(ScheduleAddException) + def _add_schedule(self, data, ts): + if self.schedule is None: + raise AttributeError('Schedule not specified.') + self.schedule.add(data, ts) + + @_wrapped_operation(ScheduleReadException) + def _read_schedule(self, ts): + if self.schedule is None: + raise AttributeError('Schedule not specified.') + return self.schedule.read(ts) + + def emit(self, message): + """Events should always fail silently.""" + try: + self.events.emit(message) + except: + pass + + def enqueue(self, task): + if self.always_eager: + return task.execute() + + self._write(registry.get_message_for_task(task)) + + if self.result_store: + return AsyncData(self, task) + + def dequeue(self): + message = self._read() + if message: + return registry.get_task_for_message(message) + + def _format_time(self, dt): + if dt is None: + return None + return time.mktime(dt.timetuple()) + + def emit_task(self, status, task, error=False): + if self.events: + message_data = {'status': status} + message_data.update({ + 'id': task.task_id, + 'task': type(task).__name__, + 'retries': task.retries, + 'retry_delay': task.retry_delay, + 'execute_time': self._format_time(task.execute_time), + 'error': error}) + if error: + message_data['traceback'] = traceback.format_exc() + self.emit(json.dumps(message_data)) + + def execute(self, task): + if not isinstance(task, QueueTask): + raise TypeError('Unknown object: %s' % task) + + result = task.execute() + + if result is None and not self.store_none: + return + + if self.result_store and not isinstance(task, PeriodicQueueTask): + self._put(task.task_id, pickle.dumps(result)) + + return result + + def revoke(self, task, revoke_until=None, revoke_once=False): + if not self.result_store: + raise QueueException('A DataStore is required to revoke task') + + serialized = pickle.dumps((revoke_until, revoke_once)) + self._put(task.revoke_id, serialized) + + def restore(self, task): + self._get(task.revoke_id) # simply get and delete if there + + def is_revoked(self, task, dt=None, peek=True): + if not self.result_store: + return False + res = self._get(task.revoke_id, peek=True) + if res is EmptyData: + return False + revoke_until, revoke_once = pickle.loads(res) + if revoke_once: + # This task *was* revoked for one run, but now it should be + # restored to normal execution. + if not peek: + self.restore(task) + return True + return revoke_until is None or revoke_until > dt + + def add_schedule(self, task): + msg = registry.get_message_for_task(task) + ex_time = task.execute_time or datetime.datetime.fromtimestamp(0) + self._add_schedule(msg, ex_time) + + def read_schedule(self, ts): + return [ + registry.get_task_for_message(m) for m in self._read_schedule(ts)] + + def flush(self): + self.queue.flush() + + def ready_to_run(self, cmd, dt=None): + dt = dt or datetime.datetime.utcnow() + return cmd.execute_time is None or cmd.execute_time <= dt + + +class AsyncData(object): + def __init__(self, huey, task): + self.huey = huey + self.task = task + + self._result = EmptyData + + def _get(self): + task_id = self.task.task_id + if self._result is EmptyData: + res = self.huey._get(task_id) + + if res is not EmptyData: + self._result = pickle.loads(res) + return self._result + else: + return res + else: + return self._result + + def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0, + revoke_on_timeout=False): + if not blocking: + res = self._get() + if res is not EmptyData: + return res + else: + start = time.time() + delay = .1 + while self._result is EmptyData: + if timeout and time.time() - start >= timeout: + if revoke_on_timeout: + self.revoke() + raise DataStoreTimeout + if delay > max_delay: + delay = max_delay + if self._get() is EmptyData: + time.sleep(delay) + delay *= backoff + + return self._result + + def revoke(self): + self.huey.revoke(self.task) + + def restore(self): + self.huey.restore(self.task) + + +def with_metaclass(meta, base=object): + return meta("NewBase", (base,), {}) + + +class QueueTaskMetaClass(type): + def __init__(cls, name, bases, attrs): + """ + Metaclass to ensure that all task classes are registered + """ + registry.register(cls) + + +class QueueTask(with_metaclass(QueueTaskMetaClass)): + """ + A class that encapsulates the logic necessary to 'do something' given some + arbitrary data. When enqueued with the :class:`Huey`, it will be + stored in a queue for out-of-band execution via the consumer. See also + the :meth:`task` decorator, which can be used to automatically + execute any function out-of-band. + + Example:: + + class SendEmailTask(QueueTask): + def execute(self): + data = self.get_data() + send_email(data['recipient'], data['subject'], data['body']) + + huey.enqueue( + SendEmailTask({ + 'recipient': 'somebody@spam.com', + 'subject': 'look at this awesome website', + 'body': 'http://youtube.com' + }) + ) + """ + + def __init__(self, data=None, task_id=None, execute_time=None, retries=0, + retry_delay=0): + self.set_data(data) + self.task_id = task_id or self.create_id() + self.revoke_id = 'r:%s' % self.task_id + self.execute_time = execute_time + self.retries = retries + self.retry_delay = retry_delay + + def create_id(self): + return str(uuid.uuid4()) + + def get_data(self): + return self.data + + def set_data(self, data): + self.data = data + + def execute(self): + """Execute any arbitary code here""" + raise NotImplementedError + + def __eq__(self, rhs): + return ( + self.task_id == rhs.task_id and + self.execute_time == rhs.execute_time and + type(self) == type(rhs)) + + +class PeriodicQueueTask(QueueTask): + def create_id(self): + return registry.task_to_string(type(self)) + + def validate_datetime(self, dt): + """Validate that the task should execute at the given datetime""" + return False + + +def create_task(task_class, func, retries_as_argument=False, task_name=None, + include_task=False, **kwargs): + def execute(self): + args, kwargs = self.data or ((), {}) + if retries_as_argument: + kwargs['retries'] = self.retries + if include_task: + kwargs['task'] = self + return func(*args, **kwargs) + + attrs = { + 'execute': execute, + '__module__': func.__module__, + '__doc__': func.__doc__ + } + attrs.update(kwargs) + + klass = type( + task_name or 'queuecmd_%s' % (func.__name__), + (task_class,), + attrs + ) + + return klass + +dash_re = re.compile('(\d+)-(\d+)') +every_re = re.compile('\*\/(\d+)') + +def crontab(month='*', day='*', day_of_week='*', hour='*', minute='*'): + """ + Convert a "crontab"-style set of parameters into a test function that will + return True when the given datetime matches the parameters set forth in + the crontab. + + Acceptable inputs: + * = every distinct value + */n = run every "n" times, i.e. hours='*/4' == 0, 4, 8, 12, 16, 20 + m-n = run every time m..n + m,n = run on m and n + """ + validation = ( + ('m', month, range(1, 13)), + ('d', day, range(1, 32)), + ('w', day_of_week, range(7)), + ('H', hour, range(24)), + ('M', minute, range(60)) + ) + cron_settings = [] + + for (date_str, value, acceptable) in validation: + settings = set([]) + + if isinstance(value, int): + value = str(value) + + for piece in value.split(','): + if piece == '*': + settings.update(acceptable) + continue + + if piece.isdigit(): + piece = int(piece) + if piece not in acceptable: + raise ValueError('%d is not a valid input' % piece) + settings.add(piece) + + else: + dash_match = dash_re.match(piece) + if dash_match: + lhs, rhs = map(int, dash_match.groups()) + if lhs not in acceptable or rhs not in acceptable: + raise ValueError('%s is not a valid input' % piece) + settings.update(range(lhs, rhs+1)) + continue + + every_match = every_re.match(piece) + if every_match: + interval = int(every_match.groups()[0]) + settings.update(acceptable[::interval]) + + cron_settings.append(sorted(list(settings))) + + def validate_date(dt): + _, m, d, H, M, _, w, _, _ = dt.timetuple() + + # fix the weekday to be sunday=0 + w = (w + 1) % 7 + + for (date_piece, selection) in zip([m, d, w, H, M], cron_settings): + if date_piece not in selection: + return False + + return True + + return validate_date diff --git a/lib/huey/backends/__init__.py b/lib/huey/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/huey/backends/base.py b/lib/huey/backends/base.py new file mode 100644 index 0000000..ee7da35 --- /dev/null +++ b/lib/huey/backends/base.py @@ -0,0 +1,113 @@ +class BaseQueue(object): + """ + Base implementation for a Queue, all backends should subclass + """ + + # whether this backend blocks while waiting for new results or should be + # polled by the consumer + blocking = False + + def __init__(self, name, **connection): + """ + Initialize the Queue - this happens once when the module is loaded + + :param name: A string representation of the name for this queue + :param connection: Connection parameters for the queue + """ + self.name = name + self.connection = connection + + def write(self, data): + """ + Push 'data' onto the queue + """ + raise NotImplementedError + + def read(self): + """ + Pop 'data' from the queue, returning None if no data is available -- + an empty queue should not raise an Exception! + """ + raise NotImplementedError + + def remove(self, data): + """ + Remove the given data from the queue + """ + raise NotImplementedError + + def flush(self): + """ + Delete everything from the queue + """ + raise NotImplementedError + + def __len__(self): + """ + Used primarily in tests, but return the number of items in the queue + """ + raise NotImplementedError + + +class BaseSchedule(object): + def __init__(self, name, **connection): + """ + Initialize the Queue - this happens once when the module is loaded + + :param name: A string representation of the name for this queue + :param connection: Connection parameters for the queue + """ + self.name = name + self.connection = connection + + def add(self, data, ts): + """ + Add the timestamped data to the task schedule. + """ + raise NotImplementedError + + def read(self, ts): + """ + Read scheduled items for the given timestamp + """ + raise NotImplementedError + + def flush(self): + """Delete all items in schedule.""" + raise NotImplementedError + + +class BaseDataStore(object): + """ + Base implementation for a data store + """ + def __init__(self, name, **connection): + """ + Initialize the data store + """ + self.name = name + self.connection = connection + + def put(self, key, value): + raise NotImplementedError + + def peek(self, key): + raise NotImplementedError + + def get(self, key): + raise NotImplementedError + + def flush(self): + raise NotImplementedError + + +class BaseEventEmitter(object): + def __init__(self, channel, **connection): + self.channel = channel + self.connection = connection + + def emit(self, message): + raise NotImplementedError + + +Components = (BaseQueue, BaseDataStore, BaseSchedule, BaseEventEmitter) diff --git a/lib/huey/backends/dummy.py b/lib/huey/backends/dummy.py new file mode 100644 index 0000000..9b1b696 --- /dev/null +++ b/lib/huey/backends/dummy.py @@ -0,0 +1,103 @@ +""" +Test-only implementations of Queue and DataStore. These will not work for +real applications because they only store tasks/results in memory. +""" +from collections import deque +import heapq + +from huey.backends.base import BaseDataStore +from huey.backends.base import BaseEventEmitter +from huey.backends.base import BaseQueue +from huey.backends.base import BaseSchedule +from huey.utils import EmptyData + + +class DummyQueue(BaseQueue): + def __init__(self, *args, **kwargs): + super(DummyQueue, self).__init__(*args, **kwargs) + self._queue = [] + + def write(self, data): + self._queue.insert(0, data) + + def read(self): + try: + return self._queue.pop() + except IndexError: + return None + + def flush(self): + self._queue = [] + + def remove(self, data): + clone = [] + ct = 0 + for elem in self._queue: + if elem == data: + ct += 1 + else: + clone.append(elem) + self._queue = clone + return ct + + def __len__(self): + return len(self._queue) + + +class DummySchedule(BaseSchedule): + def __init__(self, *args, **kwargs): + super(DummySchedule, self).__init__(*args, **kwargs) + self._schedule = [] + + def add(self, data, ts): + heapq.heappush(self._schedule, (ts, data)) + + def read(self, ts): + res = [] + while len(self._schedule): + sts, data = heapq.heappop(self._schedule) + if sts <= ts: + res.append(data) + else: + self.add(data, sts) + break + return res + + def flush(self): + self._schedule = [] + + +class DummyDataStore(BaseDataStore): + def __init__(self, *args, **kwargs): + super(DummyDataStore, self).__init__(*args, **kwargs) + self._results = {} + + def put(self, key, value): + self._results[key] = value + + def peek(self, key): + return self._results.get(key, EmptyData) + + def get(self, key): + return self._results.pop(key, EmptyData) + + def flush(self): + self._results = {} + + +class DummyEventEmitter(BaseEventEmitter): + def __init__(self, *args, **kwargs): + super(DummyEventEmitter, self).__init__(*args, **kwargs) + self._events = deque() + self.__size = 100 + + def emit(self, message): + self._events.appendleft(message) + num_events = len(self._events) + if num_events > self.__size * 1.5: + while num_events > self.__size: + self._events.popright() + num_events -= 1 + + +Components = (DummyQueue, DummyDataStore, DummySchedule, DummyEventEmitter) diff --git a/lib/huey/backends/rabbitmq_backend.py b/lib/huey/backends/rabbitmq_backend.py new file mode 100644 index 0000000..c23cb4d --- /dev/null +++ b/lib/huey/backends/rabbitmq_backend.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +__author__ = 'deathowl' + +import datetime +import re +import time +import pika +from pika.exceptions import AMQPConnectionError + +from huey.backends.base import BaseEventEmitter +from huey.backends.base import BaseQueue + + +def clean_name(name): + return re.sub('[^a-z0-9]', '', name) + + +class RabbitQueue(BaseQueue): + """ + A simple Queue that uses the rabbit to store messages + """ + def __init__(self, name, **connection): + """ + connection = { + 'host': 'localhost', + 'port': 5672, + 'username': 'guest', + 'password': 'guest', + 'vhost': '/', + 'ssl': False + } + """ + super(RabbitQueue, self).__init__(name, **connection) + + self.queue_name = 'huey.rabbit.%s' % clean_name(name) + credentials = pika.PlainCredentials( + connection.get('username', 'guest'), + connection.get('password', 'guest')) + connection_params = pika.ConnectionParameters( + host=connection.get('host', 'localhost'), + port=connection.get('port', 5672), + credentials=credentials, + virtual_host=connection.get('vhost', '/'), + ssl=connection.get('ssl', False)) + + self.conn = pika.BlockingConnection(connection_params) + self.channel = self.conn.channel() + self.channel.queue_declare(self.queue_name, durable=True) + + def write(self, data): + self.channel.basic_publish( + exchange='', + routing_key=self.queue_name, + body=data) + + def read(self): + return self.get_data_from_queue(self.queue_name) + + def remove(self, data): + # This is not something you usually do in rabbit, this is the only + # operation, which is not atomic, but this "hack" should do the trick. + amount = 0 + idx = 0 + qlen = len(self) + + for method_frame, _, body in self.channel.consume(self.queue_name): + idx += 1 + if body == data: + self.channel.basic_ack(method_frame.delivery_tag) + amount += 1 + else: + self.channel.basic_nack( + method_frame.delivery_tag, + requeue=True) + + if idx >= qlen: + break + + self.channel.cancel() + return amount + + def flush(self): + self.channel.queue_purge(queue=self.queue_name) + return True + + def __len__(self): + queue = self.channel.queue_declare(self.queue_name, durable=True) + return queue.method.message_count + + def get_data_from_queue(self, queue): + data = None + if len(self) == 0: + return None + + for method_frame, _, body in self.channel.consume(queue): + data = body + self.channel.basic_ack(method_frame.delivery_tag) + break + + self.channel.cancel() + return data + + +class RabbitBlockingQueue(RabbitQueue): + """ + Use the blocking right pop, should result in messages getting + executed close to immediately by the consumer as opposed to + being polled for + """ + blocking = True + + def read(self): + try: + return self.get_data_from_queue(self.queue_name) + except AMQPConnectionError: + return None + + +class RabbitEventEmitter(BaseEventEmitter): + def __init__(self, channel, **connection): + super(RabbitEventEmitter, self).__init__(channel, **connection) + credentials = pika.PlainCredentials( + connection.get('username', 'guest'), + connection.get('password', 'guest')) + connection_params = pika.ConnectionParameters( + host=connection.get('host', 'localhost'), + port=connection.get('port', 5672), + credentials=credentials, + virtual_host=connection.get('vhost', '/'), + ssl=connection.get('ssl', False)) + + self.conn = pika.BlockingConnection(connection_params) + self.channel = self.conn.channel() + self.exchange_name = 'huey.events' + self.channel.exchange_declare( + exchange=self.exchange_name, + type='fanout', + auto_delete=False, + durable=True) + + def emit(self, message): + properties = pika.BasicProperties( + content_type="text/plain", + delivery_mode=2) + + self.channel.basic_publish( + exchange=self.exchange_name, + routing_key='', + body=message, + properties=properties) + + +Components = (RabbitBlockingQueue, None, None, RabbitEventEmitter) diff --git a/lib/huey/backends/redis_backend.py b/lib/huey/backends/redis_backend.py new file mode 100644 index 0000000..e92ab13 --- /dev/null +++ b/lib/huey/backends/redis_backend.py @@ -0,0 +1,153 @@ +import re +import time + +import redis +from redis.exceptions import ConnectionError + +from huey.backends.base import BaseDataStore +from huey.backends.base import BaseEventEmitter +from huey.backends.base import BaseQueue +from huey.backends.base import BaseSchedule +from huey.utils import EmptyData + + +def clean_name(name): + return re.sub('[^a-z0-9]', '', name) + + +class RedisQueue(BaseQueue): + """ + A simple Queue that uses the redis to store messages + """ + def __init__(self, name, **connection): + """ + connection = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + } + """ + super(RedisQueue, self).__init__(name, **connection) + + self.queue_name = 'huey.redis.%s' % clean_name(name) + self.conn = redis.Redis(**connection) + + def write(self, data): + self.conn.lpush(self.queue_name, data) + + def read(self): + return self.conn.rpop(self.queue_name) + + def remove(self, data): + return self.conn.lrem(self.queue_name, data) + + def flush(self): + self.conn.delete(self.queue_name) + + def __len__(self): + return self.conn.llen(self.queue_name) + + +class RedisBlockingQueue(RedisQueue): + """ + Use the blocking right pop, should result in messages getting + executed close to immediately by the consumer as opposed to + being polled for + """ + blocking = True + + def __init__(self, name, read_timeout=None, **connection): + """ + connection = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + } + """ + super(RedisBlockingQueue, self).__init__(name, **connection) + self.read_timeout = read_timeout + + def read(self): + try: + return self.conn.brpop( + self.queue_name, + timeout=self.read_timeout)[1] + except (ConnectionError, TypeError, IndexError): + # unfortunately, there is no way to differentiate a socket timing + # out and a host being unreachable + return None + + +# a custom lua script to pass to redis that will read tasks from the schedule +# and atomically pop them from the sorted set and return them. +# it won't return anything if it isn't able to remove the items it reads. +SCHEDULE_POP_LUA = """ +local key = KEYS[1] +local unix_ts = ARGV[1] +local res = redis.call('zrangebyscore', key, '-inf', unix_ts) +if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then + return res +end +""" + +class RedisSchedule(BaseSchedule): + def __init__(self, name, **connection): + super(RedisSchedule, self).__init__(name, **connection) + + self.key = 'huey.schedule.%s' % clean_name(name) + self.conn = redis.Redis(**connection) + self._pop = self.conn.register_script(SCHEDULE_POP_LUA) + + def convert_ts(self, ts): + return time.mktime(ts.timetuple()) + + def add(self, data, ts): + self.conn.zadd(self.key, data, self.convert_ts(ts)) + + def read(self, ts): + unix_ts = self.convert_ts(ts) + # invoke the redis lua script that will atomically pop off + # all the tasks older than the given timestamp + tasks = self._pop(keys=[self.key], args=[unix_ts]) + return [] if tasks is None else tasks + + def flush(self): + self.conn.delete(self.key) + + +class RedisDataStore(BaseDataStore): + def __init__(self, name, **connection): + super(RedisDataStore, self).__init__(name, **connection) + + self.storage_name = 'huey.results.%s' % clean_name(name) + self.conn = redis.Redis(**connection) + + def put(self, key, value): + self.conn.hset(self.storage_name, key, value) + + def peek(self, key): + if self.conn.hexists(self.storage_name, key): + return self.conn.hget(self.storage_name, key) + return EmptyData + + def get(self, key): + val = self.peek(key) + if val is not EmptyData: + self.conn.hdel(self.storage_name, key) + return val + + def flush(self): + self.conn.delete(self.storage_name) + + +class RedisEventEmitter(BaseEventEmitter): + def __init__(self, channel, **connection): + super(RedisEventEmitter, self).__init__(channel, **connection) + self.conn = redis.Redis(**connection) + + def emit(self, message): + self.conn.publish(self.channel, message) + + +Components = (RedisBlockingQueue, RedisDataStore, RedisSchedule, + RedisEventEmitter) diff --git a/lib/huey/backends/sqlite_backend.py b/lib/huey/backends/sqlite_backend.py new file mode 100644 index 0000000..dd801c9 --- /dev/null +++ b/lib/huey/backends/sqlite_backend.py @@ -0,0 +1,205 @@ +""" SQLite backend for Huey. + +Inspired from a snippet by Thiago Arruda [1] + +[1] http://flask.pocoo.org/snippets/88/ +""" +import json +import sqlite3 +import time +try: + from thread import get_ident +except ImportError: # Python 3 + try: + from threading import get_ident + except ImportError: + from _thread import get_ident + buffer = memoryview + +from huey.backends.base import BaseDataStore +from huey.backends.base import BaseEventEmitter +from huey.backends.base import BaseQueue +from huey.backends.base import BaseSchedule +from huey.utils import EmptyData + + +class _SqliteDatabase(object): + def __init__(self, location): + if location == ':memory:': + raise ValueError("Database location has to be a file path, " + "in-memory databases are not supported.") + self.location = location + self._conn_cache = {} + with self.get_connection() as conn: + # Enable write-ahead logging + conn.execute("PRAGMA journal_mode=WAL;") + # Hand over syncing responsibility to OS + conn.execute("PRAGMA synchronous=OFF;") + # Store temporary tables and indices in memory + conn.execute("PRAGMA temp_store=MEMORY;") + + def get_connection(self, immediate=False): + """ Obtain a sqlite3.Connection instance for the database. + + Connections are cached on a by-thread basis, i.e. every calling thread + will always get the same Connection object back. + """ + if immediate: + return sqlite3.Connection(self.location, timeout=60, + isolation_level="IMMEDIATE") + id = get_ident() + if id not in self._conn_cache: + self._conn_cache[id] = sqlite3.Connection( + self.location, timeout=60) + return self._conn_cache[id] + + +class SqliteQueue(BaseQueue): + """ + A simple Queue that uses SQLite to store messages + """ + _create = """ + CREATE TABLE IF NOT EXISTS {0} + ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + item BLOB + ) + """ + _count = "SELECT COUNT(*) FROM {0}" + _append = "INSERT INTO {0} (item) VALUES (?)" + _get = "SELECT id, item FROM {0} ORDER BY id LIMIT 1" + _remove_by_value = "DELETE FROM {0} WHERE item = ?" + _remove_by_id = "DELETE FROM {0} WHERE id = ?" + _flush = "DELETE FROM {0}" + + def __init__(self, name, location): + super(SqliteQueue, self).__init__(name, location=location) + self.queue_name = 'huey_queue_{0}'.format(name) + self._db = _SqliteDatabase(location) + with self._db.get_connection() as conn: + conn.execute(self._create.format(self.queue_name)) + + def write(self, data): + with self._db.get_connection() as conn: + conn.execute(self._append.format(self.queue_name), (data,)) + + def read(self): + with self._db.get_connection(immediate=True) as conn: + cursor = conn.execute(self._get.format(self.queue_name)) + try: + id, data = next(cursor) + except StopIteration: + return None + if id: + conn.execute(self._remove_by_id.format(self.queue_name), (id,)) + return data + + def remove(self, data): + with self._db.get_connection() as conn: + return conn.execute(self._remove_by_value.format(self.queue_name), + (data,)).rowcount + + def flush(self): + with self._db.get_connection() as conn: + conn.execute(self._flush.format(self.queue_name,)) + + def __len__(self): + with self._db.get_connection() as conn: + return next(conn.execute(self._count.format(self.queue_name)))[0] + + +class SqliteSchedule(BaseSchedule): + _create = """ + CREATE TABLE IF NOT EXISTS {0} + ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + item BLOB, + timestamp INTEGER + ) + """ + _read_items = """ + SELECT item, timestamp FROM {0} WHERE timestamp <= ? + ORDER BY timestamp + """ + _delete_items = "DELETE FROM {0} WHERE timestamp <= ?" + _add_item = "INSERT INTO {0} (item, timestamp) VALUES (?, ?)" + _flush = "DELETE FROM {0}" + + def __init__(self, name, location): + super(SqliteSchedule, self).__init__(name, location=location) + self._db = _SqliteDatabase(location) + self.name = 'huey_schedule_{0}'.format(name) + with self._db.get_connection() as conn: + conn.execute(self._create.format(self.name)) + + def convert_ts(self, ts): + return time.mktime(ts.timetuple()) + + def add(self, data, ts): + with self._db.get_connection() as conn: + conn.execute(self._add_item.format(self.name), + (data, self.convert_ts(ts))) + + def read(self, ts): + with self._db.get_connection() as conn: + results = conn.execute(self._read_items.format(self.name), + (self.convert_ts(ts),)).fetchall() + conn.execute(self._delete_items.format(self.name), + (self.convert_ts(ts),)) + return [data for data, _ in results] + + def flush(self): + with self._db.get_connection() as conn: + conn.execute(self._flush.format(self.name)) + + +class SqliteDataStore(BaseDataStore): + _create = """ + CREATE TABLE IF NOT EXISTS {0} + ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT, + result BLOB + ) + """ + _put = "INSERT INTO {0} (key, result) VALUES (?, ?)" + _peek = "SELECT result FROM {0} WHERE key = ?" + _remove = "DELETE FROM {0} WHERE key = ?" + _flush = "DELETE FROM {0}" + + def __init__(self, name, location): + super(SqliteDataStore, self).__init__(name, location=location) + self._db = _SqliteDatabase(location) + self.name = 'huey_results_{0}'.format(name) + with self._db.get_connection() as conn: + conn.execute(self._create.format(self.name)) + + def put(self, key, value): + with self._db.get_connection() as conn: + conn.execute(self._remove.format(self.name), (key,)) + conn.execute(self._put.format(self.name), (key, value)) + + def peek(self, key): + with self._db.get_connection() as conn: + try: + return next(conn.execute(self._peek.format(self.name), + (key,)))[0] + except StopIteration: + return EmptyData + + def get(self, key): + with self._db.get_connection() as conn: + try: + data = next(conn.execute(self._peek.format(self.name), + (key,)))[0] + conn.execute(self._remove.format(self.name), (key,)) + return data + except StopIteration: + return EmptyData + + def flush(self): + with self._db.get_connection() as conn: + conn.execute(self._flush.format(self.name)) + + +Components = (SqliteQueue, SqliteDataStore, SqliteSchedule, None) diff --git a/lib/huey/bin/__init__.py b/lib/huey/bin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/huey/bin/huey_consumer.py b/lib/huey/bin/huey_consumer.py new file mode 100755 index 0000000..be53a6b --- /dev/null +++ b/lib/huey/bin/huey_consumer.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +import logging +import optparse +import os +import sys +from logging.handlers import RotatingFileHandler + +from huey.consumer import Consumer +from huey.utils import load_class + + +def err(s): + sys.stderr.write('\033[91m%s\033[0m\n' % s) + + +def get_loglevel(verbose=None): + if verbose is None: + return logging.INFO + elif verbose: + return logging.DEBUG + return logging.ERROR + + +def setup_logger(loglevel, logfile): + log_format = ('%(threadName)s %(asctime)s %(name)s ' + '%(levelname)s %(message)s') + logging.basicConfig(level=loglevel, format=log_format) + + if logfile: + handler = RotatingFileHandler( + logfile, maxBytes=1024*1024, backupCount=3) + handler.setFormatter(logging.Formatter(log_format)) + logging.getLogger().addHandler(handler) + + +def get_option_parser(): + parser = optparse.OptionParser( + 'Usage: %prog [options] path.to.huey_instance') + parser.add_option('-l', '--logfile', dest='logfile', + help='write logs to FILE', metavar='FILE') + parser.add_option('-v', '--verbose', dest='verbose', + help='verbose logging', action='store_true') + parser.add_option('-q', '--quiet', dest='verbose', + help='log exceptions only', action='store_false') + parser.add_option('-w', '--workers', dest='workers', type='int', + help='worker threads (default=1)', default=1) + parser.add_option('-t', '--threads', dest='workers', type='int', + help='same as "workers"', default=1) + parser.add_option('-p', '--periodic', dest='periodic', default=True, + help='execute periodic tasks (default=True)', + action='store_true') + parser.add_option('-n', '--no-periodic', dest='periodic', + help='do NOT execute periodic tasks', + action='store_false') + parser.add_option('-d', '--delay', dest='initial_delay', type='float', + help='initial delay in seconds (default=0.1)', + default=0.1) + parser.add_option('-m', '--max-delay', dest='max_delay', type='float', + help='maximum time to wait between polling the queue ' + '(default=10)', + default=10) + parser.add_option('-b', '--backoff', dest='backoff', type='float', + help='amount to backoff delay when no results present ' + '(default=1.15)', + default=1.15) + parser.add_option('-P', '--periodic-task-interval', + dest='periodic_task_interval', + type='float', help='Granularity of periodic tasks.', + default=60.0) + parser.add_option('-S', '--scheduler-interval', dest='scheduler_interval', + type='float', help='Granularity of scheduler.', + default=1.0) + parser.add_option('-u', '--utc', dest='utc', action='store_true', + help='use UTC time for all tasks (default=True)', + default=True) + parser.add_option('--localtime', dest='utc', action='store_false', + help='use local time for all tasks') + return parser + + +def load_huey(path): + try: + return load_class(path) + except: + cur_dir = os.getcwd() + if cur_dir not in sys.path: + sys.path.insert(0, cur_dir) + return load_huey(path) + err('Error importing %s' % path) + raise + + +def consumer_main(): + parser = get_option_parser() + options, args = parser.parse_args() + + setup_logger(get_loglevel(options.verbose), options.logfile) + + if len(args) == 0: + err('Error: missing import path to `Huey` instance') + err('Example: huey_consumer.py app.queue.huey_instance') + sys.exit(1) + + huey_instance = load_huey(args[0]) + + consumer = Consumer( + huey_instance, + options.workers, + options.periodic, + options.initial_delay, + options.backoff, + options.max_delay, + options.utc, + options.scheduler_interval, + options.periodic_task_interval) + consumer.run() + + +if __name__ == '__main__': + consumer_main() diff --git a/lib/huey/consumer.py b/lib/huey/consumer.py new file mode 100644 index 0000000..2bf4b03 --- /dev/null +++ b/lib/huey/consumer.py @@ -0,0 +1,279 @@ +import datetime +import logging +import signal +import threading +import time + +from huey.exceptions import DataStoreGetException +from huey.exceptions import QueueException +from huey.exceptions import QueueReadException +from huey.exceptions import DataStorePutException +from huey.exceptions import QueueWriteException +from huey.exceptions import ScheduleAddException +from huey.exceptions import ScheduleReadException +from huey.registry import registry + + +class ConsumerThread(threading.Thread): + def __init__(self, huey, utc, shutdown, interval=60): + self.huey = huey + self.utc = utc + self.shutdown = shutdown + self.interval = interval + self._logger = logging.getLogger('huey.consumer.ConsumerThread') + super(ConsumerThread, self).__init__() + + def get_now(self): + if self.utc: + return datetime.datetime.utcnow() + return datetime.datetime.now() + + def on_shutdown(self): + pass + + def loop(self, now): + raise NotImplementedError + + def run(self): + while not self.shutdown.is_set(): + self.loop() + self._logger.debug('Thread shutting down') + self.on_shutdown() + + def enqueue(self, task): + try: + self.huey.enqueue(task) + self.huey.emit_task('enqueued', task) + except QueueWriteException: + self._logger.error('Error enqueueing task: %s' % task) + + def add_schedule(self, task): + try: + self.huey.add_schedule(task) + self.huey.emit_task('scheduled', task) + except ScheduleAddException: + self._logger.error('Error adding task to schedule: %s' % task) + + def is_revoked(self, task, ts): + try: + if self.huey.is_revoked(task, ts, peek=False): + self.huey.emit_task('revoked', task) + return True + return False + except DataStoreGetException: + self._logger.error('Error checking if task is revoked: %s' % task) + return True + + def sleep_for_interval(self, start_ts): + delta = time.time() - start_ts + if delta < self.interval: + time.sleep(self.interval - (time.time() - start_ts)) + + +class PeriodicTaskThread(ConsumerThread): + def loop(self, now=None): + now = now or self.get_now() + self._logger.debug('Checking periodic command registry') + start = time.time() + for task in registry.get_periodic_tasks(): + if task.validate_datetime(now): + self._logger.info('Scheduling %s for execution' % task) + self.enqueue(task) + + self.sleep_for_interval(start) + + +class SchedulerThread(ConsumerThread): + def read_schedule(self, ts): + try: + return self.huey.read_schedule(ts) + except ScheduleReadException: + self._logger.error('Error reading schedule', exc_info=1) + return [] + + def loop(self, now=None): + now = now or self.get_now() + start = time.time() + + for task in self.read_schedule(now): + self._logger.info('Scheduling %s for execution' % task) + self.enqueue(task) + + self.sleep_for_interval(start) + + +class WorkerThread(ConsumerThread): + def __init__(self, huey, default_delay, max_delay, backoff, utc, + shutdown): + self.delay = self.default_delay = default_delay + self.max_delay = max_delay + self.backoff = backoff + self._logger = logging.getLogger('huey.consumer.WorkerThread') + super(WorkerThread, self).__init__(huey, utc, shutdown) + + def loop(self): + self.check_message() + + def check_message(self): + self._logger.debug('Checking for message') + task = exc_raised = None + try: + task = self.huey.dequeue() + except QueueReadException: + self._logger.error('Error reading from queue', exc_info=1) + exc_raised = True + except QueueException: + self._logger.error('Queue exception', exc_info=1) + exc_raised = True + except: + self._logger.error('Unknown exception', exc_info=1) + exc_raised = True + + if task: + self.delay = self.default_delay + self.handle_task(task, self.get_now()) + elif exc_raised or not self.huey.blocking: + self.sleep() + + def sleep(self): + if self.delay > self.max_delay: + self.delay = self.max_delay + + self._logger.debug('No messages, sleeping for: %s' % self.delay) + time.sleep(self.delay) + self.delay *= self.backoff + + def handle_task(self, task, ts): + if not self.huey.ready_to_run(task, ts): + self._logger.info('Adding %s to schedule' % task) + self.add_schedule(task) + elif not self.is_revoked(task, ts): + self.process_task(task, ts) + + def process_task(self, task, ts): + try: + self._logger.info('Executing %s' % task) + self.huey.emit_task('started', task) + self.huey.execute(task) + self.huey.emit_task('finished', task) + except DataStorePutException: + self._logger.warn('Error storing result', exc_info=1) + except: + self._logger.error('Unhandled exception in worker thread', + exc_info=1) + self.huey.emit_task('error', task, error=True) + if task.retries: + self.huey.emit_task('retrying', task) + self.requeue_task(task, self.get_now()) + + def requeue_task(self, task, ts): + task.retries -= 1 + self._logger.info('Re-enqueueing task %s, %s tries left' % + (task.task_id, task.retries)) + if task.retry_delay: + delay = datetime.timedelta(seconds=task.retry_delay) + task.execute_time = ts + delay + self._logger.debug('Execute %s at: %s' % (task, task.execute_time)) + self.add_schedule(task) + else: + self.enqueue(task) + + +class Consumer(object): + def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1, + backoff=1.15, max_delay=10.0, utc=True, scheduler_interval=1, + periodic_task_interval=60): + + self._logger = logging.getLogger('huey.consumer.ConsumerThread') + self.huey = huey + self.workers = workers + self.periodic = periodic + self.default_delay = initial_delay + self.backoff = backoff + self.max_delay = max_delay + self.utc = utc + self.scheduler_interval = scheduler_interval + self.periodic_task_interval = periodic_task_interval + + self.delay = self.default_delay + + self._shutdown = threading.Event() + + def run(self): + try: + self.start() + # it seems that calling self._shutdown.wait() here prevents the + # signal handler from executing + while not self._shutdown.is_set(): + self._shutdown.wait(.1) + except: + self._logger.error('Error', exc_info=1) + self.shutdown() + + def start(self): + self._logger.info('%d worker threads' % self.workers) + + self._set_signal_handler() + self._log_registered_commands() + self._create_threads() + + self._logger.info('Starting scheduler thread') + self.scheduler_t.start() + + self._logger.info('Starting worker threads') + for worker in self.worker_threads: + worker.start() + + if self.periodic: + self._logger.info('Starting periodic task scheduler thread') + self.periodic_t.start() + + def shutdown(self): + self._logger.info('Shutdown initiated') + self._shutdown.set() + + def _handle_signal(self, sig_num, frame): + self._logger.info('Received SIGTERM') + self.shutdown() + + def _set_signal_handler(self): + self._logger.info('Setting signal handler') + signal.signal(signal.SIGTERM, self._handle_signal) + + def _log_registered_commands(self): + msg = ['Huey consumer initialized with following commands'] + for command in registry._registry: + msg.append('+ %s' % command.replace('queuecmd_', '')) + self._logger.info('\n'.join(msg)) + + def _create_threads(self): + self.scheduler_t = SchedulerThread( + self.huey, + self.utc, + self._shutdown, + self.scheduler_interval) + self.scheduler_t.name = 'Scheduler' + + self.worker_threads = [] + for i in range(self.workers): + worker_t = WorkerThread( + self.huey, + self.default_delay, + self.max_delay, + self.backoff, + self.utc, + self._shutdown) + worker_t.daemon = True + worker_t.name = 'Worker %d' % (i + 1) + self.worker_threads.append(worker_t) + + if self.periodic: + self.periodic_t = PeriodicTaskThread( + self.huey, + self.utc, + self._shutdown, + self.periodic_task_interval) + self.periodic_t.daemon = True + self.periodic_t.name = 'Periodic Task' + else: + self.periodic_t = None diff --git a/lib/huey/djhuey/__init__.py b/lib/huey/djhuey/__init__.py new file mode 100644 index 0000000..9aa3765 --- /dev/null +++ b/lib/huey/djhuey/__init__.py @@ -0,0 +1,119 @@ +""" +This module contains a lot of cruft to handle instantiating a "Huey" object +using Django settings. Unlike more flexible python apps, the huey django +integration consists of a single global Huey instance configured via the +settings module. +""" +from functools import wraps +import sys + +from django.conf import settings +from django.db import connection + +from huey import crontab +from huey import Huey +from huey.utils import load_class + + +configuration_message = """ +Configuring Huey for use with Django +==================================== + +Huey was designed to be simple to configure in the general case. For that +reason, huey will "just work" with no configuration at all provided you have +Redis installed and running locally. + +On the other hand, you can configure huey manually using the following +setting structure. The following example uses Redis on localhost: + +Simply point to a backend: + +HUEY = { + 'backend': 'huey.backends.redis_backend', + 'name': 'unique name', + 'connection': {'host': 'localhost', 'port': 6379} + + 'consumer_options': {'workers': 4}, +} + +If you would like to configure Huey's logger using Django's integrated logging +settings, the logger used by consumer is named "huey.consumer". + +For more granular control, you can assign HUEY programmatically: + +HUEY = Huey(RedisBlockingQueue('my-queue')) +""" + +def default_queue_name(): + try: + return settings.DATABASE_NAME + except AttributeError: + return settings.DATABASES['default']['NAME'] + except KeyError: + return 'huey' + +def config_error(msg): + print(configuration_message) + print('\n\n') + print(msg) + sys.exit(1) + +def dynamic_import(obj, key, required=False): + try: + path = obj[key] + except KeyError: + if required: + config_error('Missing required configuration: "%s"' % key) + return None + try: + return load_class(path + '.Components') + except ImportError: + config_error('Unable to import %s: "%s"' % (key, path)) + +try: + HUEY = getattr(settings, 'HUEY', None) +except: + config_error('Error encountered reading settings.HUEY') + +if HUEY is None: + try: + from huey import RedisHuey + except ImportError: + config_error('Error: Huey could not import the redis backend. ' + 'Install `redis-py`.') + HUEY = RedisHuey(default_queue_name()) + +if not isinstance(HUEY, Huey): + Queue, DataStore, Schedule, Events = dynamic_import(HUEY, 'backend') + name = HUEY.get('name') or default_queue_name() + conn = HUEY.get('connection', {}) + always_eager = HUEY.get('always_eager', False) + HUEY = Huey( + Queue(name, **conn), + DataStore and DataStore(name, **conn) or None, + Schedule and Schedule(name, **conn) or None, + Events and Events(name, **conn) or None, + always_eager=always_eager) + +task = HUEY.task +periodic_task = HUEY.periodic_task + +def close_db(fn): + """Decorator to be used with tasks that may operate on the database.""" + @wraps(fn) + def inner(*args, **kwargs): + try: + return fn(*args, **kwargs) + finally: + connection.close() + return inner + +def db_task(*args, **kwargs): + def decorator(fn): + return task(*args, **kwargs)(close_db(fn)) + return decorator + +def db_periodic_task(*args, **kwargs): + def decorator(fn): + return periodic_task(*args, **kwargs)(close_db(fn)) + return decorator diff --git a/lib/huey/djhuey/management/__init__.py b/lib/huey/djhuey/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/huey/djhuey/management/commands/__init__.py b/lib/huey/djhuey/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/huey/djhuey/management/commands/run_huey.py b/lib/huey/djhuey/management/commands/run_huey.py new file mode 100644 index 0000000..4aafb2d --- /dev/null +++ b/lib/huey/djhuey/management/commands/run_huey.py @@ -0,0 +1,126 @@ +import imp +import sys +from optparse import make_option + +from django.conf import settings +from django.core.management.base import BaseCommand +try: + from importlib import import_module +except ImportError: + from django.utils.importlib import import_module + +try: + from django.apps import apps as django_apps + HAS_DJANGO_APPS = True +except ImportError: + # Django 1.6 + HAS_DJANGO_APPS = False + +from huey.consumer import Consumer +from huey.bin.huey_consumer import get_loglevel +from huey.bin.huey_consumer import setup_logger + + +class Command(BaseCommand): + """ + Queue consumer. Example usage:: + + To start the consumer (note you must export the settings module): + + django-admin.py run_huey + """ + help = "Run the queue consumer" + + option_list = BaseCommand.option_list + ( + make_option('--periodic', '-p', + dest='periodic', + action='store_true', + help='Enqueue periodic commands' + ), + make_option('--no-periodic', '-n', + dest='periodic', + action='store_false', + help='Do not enqueue periodic commands' + ), + make_option('--workers', '-w', + dest='workers', + type='int', + help='Number of worker threads' + ), + make_option('--delay', '-d', + dest='initial_delay', + type='float', + help='Delay between polling requests' + ), + make_option('--max_delay', '-m', + dest='max_delay', + type='float', + help='Maximum delay between polling requests' + ), + ) + + def autodiscover_appconfigs(self): + """Use Django app registry to pull out potential apps with tasks.py module.""" + module_name = 'tasks' + for config in django_apps.get_app_configs(): + app_path = config.module.__path__ + try: + fp, path, description = imp.find_module(module_name, app_path) + except ImportError: + continue + else: + import_path = '%s.%s' % (config.name, module_name) + imp.load_module(import_path, fp, path, description) + + def autodiscover_old(self): + # this is to find modules named in a django project's + # installed apps directories + module_name = 'tasks' + + for app in settings.INSTALLED_APPS: + try: + import_module(app) + app_path = sys.modules[app].__path__ + except AttributeError: + continue + try: + imp.find_module(module_name, app_path) + except ImportError: + continue + import_module('%s.%s' % (app, module_name)) + app_path = sys.modules['%s.%s' % (app, module_name)] + + def autodiscover(self): + """Switch between Django 1.7 style and old style app importing.""" + if HAS_DJANGO_APPS: + self.autodiscover_appconfigs() + else: + self.autodiscover_old() + + def handle(self, *args, **options): + from huey.djhuey import HUEY + try: + consumer_options = settings.HUEY['consumer_options'] + except: + consumer_options = {} + + if options['workers'] is not None: + consumer_options['workers'] = options['workers'] + + if options['periodic'] is not None: + consumer_options['periodic'] = options['periodic'] + + if options['initial_delay'] is not None: + consumer_options['initial_delay'] = options['initial_delay'] + + if options['max_delay'] is not None: + consumer_options['max_delay'] = options['max_delay'] + + self.autodiscover() + + loglevel = get_loglevel(consumer_options.pop('loglevel', None)) + logfile = consumer_options.pop('logfile', None) + setup_logger(loglevel, logfile) + + consumer = Consumer(HUEY, **consumer_options) + consumer.run() diff --git a/lib/huey/djhuey/models.py b/lib/huey/djhuey/models.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/huey/exceptions.py b/lib/huey/exceptions.py new file mode 100644 index 0000000..16b0340 --- /dev/null +++ b/lib/huey/exceptions.py @@ -0,0 +1,26 @@ +class QueueException(Exception): + pass + +class QueueWriteException(QueueException): + pass + +class QueueReadException(QueueException): + pass + +class QueueRemoveException(QueueException): + pass + +class DataStoreGetException(QueueException): + pass + +class DataStorePutException(QueueException): + pass + +class DataStoreTimeout(QueueException): + pass + +class ScheduleAddException(QueueException): + pass + +class ScheduleReadException(QueueException): + pass diff --git a/lib/huey/peewee_helpers/__init__.py b/lib/huey/peewee_helpers/__init__.py new file mode 100644 index 0000000..4da1150 --- /dev/null +++ b/lib/huey/peewee_helpers/__init__.py @@ -0,0 +1,20 @@ +from functools import wraps + + +def _transaction(db, fn): + @wraps(fn) + def inner(*args, **kwargs): + # Execute function in its own connection, in a transaction. + with db.execution_context(with_transaction=True): + return fn(*args, **kwargs) + return inner + +def db_task(huey, db, *args, **kwargs): + def decorator(fn): + return huey.task(*args, **kwargs)(_transaction(db, fn)) + return decorator + +def db_periodic_task(huey, db, *args, **kwargs): + def decorator(fn): + return huey.periodic_task(*args, **kwargs)(_transaction(db, fn)) + return decorator diff --git a/lib/huey/registry.py b/lib/huey/registry.py new file mode 100644 index 0000000..0c4e520 --- /dev/null +++ b/lib/huey/registry.py @@ -0,0 +1,77 @@ +import pickle + +from huey.exceptions import QueueException + + +class TaskRegistry(object): + """ + A simple Registry used to track subclasses of :class:`QueueTask` - the + purpose of this registry is to allow translation from queue messages to + task classes, and vice-versa. + """ + _ignore = ['QueueTask', 'PeriodicQueueTask'] + + _registry = {} + _periodic_tasks = [] + + def task_to_string(self, task): + return '%s' % (task.__name__) + + def register(self, task_class): + klass_str = self.task_to_string(task_class) + if klass_str in self._ignore: + return + + if klass_str not in self._registry: + self._registry[klass_str] = task_class + + # store an instance in a separate list of periodic tasks + if hasattr(task_class, 'validate_datetime'): + self._periodic_tasks.append(task_class()) + + def unregister(self, task_class): + klass_str = self.task_to_string(task_class) + + if klass_str in self._registry: + del(self._registry[klass_str]) + + for task in self._periodic_tasks: + if isinstance(task, task_class): + self._periodic_tasks.remove(task) + + def __contains__(self, klass_str): + return klass_str in self._registry + + def get_message_for_task(self, task): + """Convert a task object to a message for storage in the queue""" + return pickle.dumps(( + task.task_id, + self.task_to_string(type(task)), + task.execute_time, + task.retries, + task.retry_delay, + task.get_data(), + )) + + def get_task_class(self, klass_str): + klass = self._registry.get(klass_str) + + if not klass: + raise QueueException('%s not found in TaskRegistry' % klass_str) + + return klass + + def get_task_for_message(self, msg): + """Convert a message from the queue into a task""" + # parse out the pieces from the enqueued message + raw = pickle.loads(msg) + task_id, klass_str, execute_time, retries, delay, data = raw + + klass = self.get_task_class(klass_str) + return klass(data, task_id, execute_time, retries, delay) + + def get_periodic_tasks(self): + return self._periodic_tasks + + +registry = TaskRegistry() diff --git a/lib/huey/tests/__init__.py b/lib/huey/tests/__init__.py new file mode 100644 index 0000000..118aab6 --- /dev/null +++ b/lib/huey/tests/__init__.py @@ -0,0 +1,10 @@ +from huey.tests.backends import * +from huey.tests.consumer import * +from huey.tests.crontab import * +from huey.tests.queue import * +from huey.tests.utils import * +try: + import peewee + from huey.tests.peewee_tests import * +except ImportError: + pass diff --git a/lib/huey/tests/backends.py b/lib/huey/tests/backends.py new file mode 100644 index 0000000..2423783 --- /dev/null +++ b/lib/huey/tests/backends.py @@ -0,0 +1,170 @@ +from collections import deque +import datetime +import os +import sys +import tempfile +import unittest + +from huey.api import Huey +from huey.backends.dummy import DummyDataStore +from huey.backends.dummy import DummyEventEmitter +from huey.backends.dummy import DummyQueue +from huey.backends.dummy import DummySchedule +from huey.utils import EmptyData +from huey.backends.sqlite_backend import SqliteDataStore +from huey.backends.sqlite_backend import SqliteQueue +from huey.backends.sqlite_backend import SqliteSchedule +try: + from huey.backends.redis_backend import RedisDataStore + from huey.backends.redis_backend import RedisEventEmitter + from huey.backends.redis_backend import RedisQueue + from huey.backends.redis_backend import RedisSchedule +except ImportError: + RedisQueue = RedisDataStore = RedisSchedule = RedisEventEmitter = None + +try: + from huey.backends.rabbitmq_backend import RabbitQueue, RabbitEventEmitter +except ImportError: + RabbitQueue = RabbitEventEmitter = None + + +if sys.version_info[0] == 2: + redis_kwargs = {} +else: + redis_kwargs = {'decode_responses': True} + + +QUEUES = (DummyQueue, RedisQueue, SqliteQueue, RabbitQueue) +DATA_STORES = (DummyDataStore, RedisDataStore, SqliteDataStore, None) +SCHEDULES = (DummySchedule, RedisSchedule, SqliteSchedule, None) +EVENTS = (DummyEventEmitter, RedisEventEmitter, None, RabbitEventEmitter) + + +class HueyBackendTestCase(unittest.TestCase): + def setUp(self): + self.sqlite_location = tempfile.mkstemp(prefix='hueytest.')[1] + + def tearDown(self): + os.unlink(self.sqlite_location) + + def test_queues(self): + result_store = DummyDataStore('dummy') + for q in QUEUES: + if not q: + continue + if issubclass(q, SqliteQueue): + queue = q('test', location=self.sqlite_location) + elif issubclass(q, RedisQueue): + queue = q('test', **redis_kwargs) + else: + queue = q('test') + queue.flush() + queue.write('a') + queue.write('b') + self.assertEqual(len(queue), 2) + self.assertEqual(queue.read(), 'a') + self.assertEqual(queue.read(), 'b') + self.assertEqual(queue.read(), None) + + queue.write('c') + queue.write('d') + queue.write('c') + queue.write('x') + queue.write('d') + self.assertEqual(len(queue), 5) + self.assertEqual(queue.remove('c'), 2) + self.assertEqual(len(queue), 3) + self.assertEqual(queue.read(), 'd') + self.assertEqual(queue.read(), 'x') + self.assertEqual(queue.read(), 'd') + + queue.flush() + test_huey = Huey(queue, result_store) + + @test_huey.task() + def test_queues_add(k, v): + return k + v + + res = test_queues_add('k', 'v') + self.assertEqual(len(queue), 1) + task = test_huey.dequeue() + test_huey.execute(task) + self.assertEqual(res.get(), 'kv') + + res = test_queues_add('\xce', '\xcf') + task = test_huey.dequeue() + test_huey.execute(task) + self.assertEqual(res.get(), '\xce\xcf') + + def test_data_stores(self): + for d in DATA_STORES: + if not d: + continue + if issubclass(d, SqliteDataStore): + data_store = d('test', location=self.sqlite_location) + elif issubclass(d, RedisDataStore): + data_store = d('test', **redis_kwargs) + else: + data_store = d('test') + data_store.put('k1', 'v1') + data_store.put('k2', 'v2') + data_store.put('k3', 'v3') + self.assertEqual(data_store.peek('k2'), 'v2') + self.assertEqual(data_store.get('k2'), 'v2') + self.assertEqual(data_store.peek('k2'), EmptyData) + self.assertEqual(data_store.get('k2'), EmptyData) + + self.assertEqual(data_store.peek('k3'), 'v3') + data_store.put('k3', 'v3-2') + self.assertEqual(data_store.peek('k3'), 'v3-2') + + def test_schedules(self): + for s in SCHEDULES: + if not s: + continue + if issubclass(s, SqliteSchedule): + schedule = s('test', location=self.sqlite_location) + elif issubclass(s, RedisSchedule): + schedule = s('test', **redis_kwargs) + else: + schedule = s('test') + dt1 = datetime.datetime(2013, 1, 1, 0, 0) + dt2 = datetime.datetime(2013, 1, 2, 0, 0) + dt3 = datetime.datetime(2013, 1, 3, 0, 0) + dt4 = datetime.datetime(2013, 1, 4, 0, 0) + + # Add to schedule out-of-order to ensure sorting is performed by + # the schedule. + schedule.add('s2', dt2) + schedule.add('s1', dt1) + schedule.add('s4', dt4) + schedule.add('s3', dt3) + + # Ensure that asking for a timestamp previous to any item in the + # schedule returns empty list. + self.assertEqual( + schedule.read(dt1 - datetime.timedelta(days=1)), + []) + + # Ensure the upper boundary is inclusive of whatever timestamp + # is passed in. + self.assertEqual(schedule.read(dt3), ['s1', 's2', 's3']) + self.assertEqual(schedule.read(dt3), []) + + # Ensure the schedule is flushed and an empty schedule returns an + # empty list. + self.assertEqual(schedule.read(dt4), ['s4']) + self.assertEqual(schedule.read(dt4), []) + + def test_events(self): + for e in EVENTS: + if not e: + continue + e = e('test') + + messages = ['a', 'b', 'c', 'd'] + for message in messages: + e.emit(message) + + if hasattr(e, '_events'): + self.assertEqual(e._events, deque(['d', 'c', 'b', 'a'])) diff --git a/lib/huey/tests/consumer.py b/lib/huey/tests/consumer.py new file mode 100644 index 0000000..7f31044 --- /dev/null +++ b/lib/huey/tests/consumer.py @@ -0,0 +1,441 @@ +from collections import deque +import datetime +import json +import logging +import threading +import time +import unittest + +from huey import crontab +from huey import Huey +from huey.backends.dummy import DummyDataStore +from huey.backends.dummy import DummyEventEmitter +from huey.backends.dummy import DummyQueue +from huey.backends.dummy import DummySchedule +from huey.consumer import Consumer +from huey.consumer import WorkerThread +from huey.registry import registry + +# Logger used by the consumer. +logger = logging.getLogger('huey.consumer') + +# Store some global state. +state = {} + +# Create a queue, result store, schedule and event emitter, then attach them +# to a test-only Huey instance. +test_queue = DummyQueue('test-queue') +test_result_store = DummyDataStore('test-queue') +test_schedule = DummySchedule('test-queue') +test_events = DummyEventEmitter('test-queue') +test_huey = Huey(test_queue, test_result_store, test_schedule, test_events) + +# Create some test tasks. +@test_huey.task() +def modify_state(k, v): + state[k] = v + return v + +@test_huey.task() +def blow_up(): + raise Exception('blowed up') + +@test_huey.task(retries=3) +def retry_command(k, always_fail=True): + if k not in state: + if not always_fail: + state[k] = 'fixed' + raise Exception('fappsk') + return state[k] + +@test_huey.task(retries=3, retry_delay=10) +def retry_command_slow(k, always_fail=True): + if k not in state: + if not always_fail: + state[k] = 'fixed' + raise Exception('fappsk') + return state[k] + +@test_huey.periodic_task(crontab(minute='0')) +def every_hour(): + state['p'] = 'y' + + +# Create a log handler that will track messages generated by the consumer. +class TestLogHandler(logging.Handler): + def __init__(self, *args, **kwargs): + self.messages = [] + logging.Handler.__init__(self, *args, **kwargs) + + def emit(self, record): + self.messages.append(record.getMessage()) + + +class ConsumerTestCase(unittest.TestCase): + def setUp(self): + global state + state = {} + + self.orig_pc = registry._periodic_tasks + registry._periodic_commands = [every_hour.task_class()] + + self.orig_sleep = time.sleep + time.sleep = lambda x: None + + test_huey.queue.flush() + test_huey.result_store.flush() + test_huey.schedule.flush() + test_events._events = deque() + + self.consumer = Consumer(test_huey, workers=2) + self.consumer._create_threads() + + self.handler = TestLogHandler() + logger.addHandler(self.handler) + logger.setLevel(logging.INFO) + + def tearDown(self): + self.consumer.shutdown() + logger.removeHandler(self.handler) + registry._periodic_tasks = self.orig_pc + time.sleep = self.orig_sleep + + def assertStatusTask(self, status_task): + parsed = [] + i = 0 + while i < len(status_task): + event = json.loads(test_events._events[i]) + status, task, extra = status_task[i] + self.assertEqual(event['status'], status) + self.assertEqual(event['id'], task.task_id) + for k, v in extra.items(): + self.assertEqual(event[k], v) + i += 1 + + def spawn(self, func, *args, **kwargs): + t = threading.Thread(target=func, args=args, kwargs=kwargs) + t.start() + return t + + def run_worker(self, task, ts=None): + worker_t = WorkerThread( + test_huey, + self.consumer.default_delay, + self.consumer.max_delay, + self.consumer.backoff, + self.consumer.utc, + self.consumer._shutdown) + ts = ts or datetime.datetime.utcnow() + worker_t.handle_task(task, ts) + + def test_message_processing(self): + self.consumer.worker_threads[0].start() + + self.assertFalse('k' in state) + + res = modify_state('k', 'v') + res.get(blocking=True) + + self.assertTrue('k' in state) + self.assertEqual(res.get(), 'v') + + self.assertEqual(len(test_events._events), 2) + self.assertStatusTask([ + ('finished', res.task, {}), + ('started', res.task, {}), + ]) + + def test_worker(self): + modify_state('k', 'w') + task = test_huey.dequeue() + self.run_worker(task) + self.assertEqual(state, {'k': 'w'}) + + def test_worker_exception(self): + blow_up() + task = test_huey.dequeue() + + self.run_worker(task) + self.assertTrue( + 'Unhandled exception in worker thread' in self.handler.messages) + + self.assertEqual(len(test_events._events), 2) + self.assertStatusTask([ + ('error', task, {'error': True}), + ('started', task, {}), + ]) + + def test_retries_and_logging(self): + # this will continually fail + retry_command('blampf') + + for i in reversed(range(4)): + task = test_huey.dequeue() + self.assertEqual(task.retries, i) + self.run_worker(task) + if i > 0: + self.assertEqual( + self.handler.messages[-1], + 'Re-enqueueing task %s, %s tries left' % ( + task.task_id, i - 1)) + self.assertStatusTask([ + ('enqueued', task, {}), + ('retrying', task, {}), + ('error', task,{}), + ('started', task, {}), + ]) + last_idx = -2 + else: + self.assertStatusTask([ + ('error', task,{}), + ('started', task, {}), + ]) + last_idx = -1 + self.assertEqual(self.handler.messages[last_idx], + 'Unhandled exception in worker thread') + + self.assertEqual(test_huey.dequeue(), None) + + def test_retries_with_success(self): + # this will fail once, then succeed + retry_command('blampf', False) + self.assertFalse('blampf' in state) + + task = test_huey.dequeue() + self.run_worker(task) + self.assertEqual(self.handler.messages, [ + 'Executing %s' % task, + 'Unhandled exception in worker thread', + 'Re-enqueueing task %s, 2 tries left' % task.task_id]) + + task = test_huey.dequeue() + self.assertEqual(task.retries, 2) + self.run_worker(task) + + self.assertEqual(state['blampf'], 'fixed') + self.assertEqual(test_huey.dequeue(), None) + + self.assertStatusTask([ + ('finished', task, {}), + ('started', task, {}), + ('enqueued', task, {'retries': 2}), + ('retrying', task, {'retries': 3}), + ('error', task, {'error': True}), + ('started', task, {}), + ]) + + def test_scheduling(self): + dt = datetime.datetime(2011, 1, 1, 0, 0) + dt2 = datetime.datetime(2037, 1, 1, 0, 0) + ad1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False) + ad2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False) + + # dequeue the past-timestamped task and run it. + worker = self.consumer.worker_threads[0] + worker.check_message() + + self.assertTrue('k' in state) + + # dequeue the future-timestamped task. + worker.check_message() + + # verify the task got stored in the schedule instead of executing + self.assertFalse('k2' in state) + + self.assertStatusTask([ + ('scheduled', ad2.task, {}), + ('finished', ad1.task, {}), + ('started', ad1.task, {}), + ]) + + # run through an iteration of the scheduler + self.consumer.scheduler_t.loop(dt) + + # our command was not enqueued and no events were emitted. + self.assertEqual(len(test_queue._queue), 0) + self.assertEqual(len(test_events._events), 3) + + # run through an iteration of the scheduler + self.consumer.scheduler_t.loop(dt2) + + # our command was enqueued + self.assertEqual(len(test_queue._queue), 1) + self.assertEqual(len(test_events._events), 4) + self.assertStatusTask([ + ('enqueued', ad2.task, {}), + ]) + + def test_retry_scheduling(self): + # this will continually fail + retry_command_slow('blampf') + cur_time = datetime.datetime.utcnow() + + task = test_huey.dequeue() + self.run_worker(task, ts=cur_time) + self.assertEqual(self.handler.messages, [ + 'Executing %s' % task, + 'Unhandled exception in worker thread', + 'Re-enqueueing task %s, 2 tries left' % task.task_id, + ]) + + in_11 = cur_time + datetime.timedelta(seconds=11) + tasks_from_sched = test_huey.read_schedule(in_11) + self.assertEqual(tasks_from_sched, [task]) + + task = tasks_from_sched[0] + self.assertEqual(task.retries, 2) + exec_time = task.execute_time + + self.assertEqual((exec_time - cur_time).seconds, 10) + self.assertStatusTask([ + ('scheduled', task, { + 'retries': 2, + 'retry_delay': 10, + 'execute_time': time.mktime(exec_time.timetuple())}), + ('retrying', task, { + 'retries': 3, + 'retry_delay': 10, + 'execute_time': None}), + ('error', task, {}), + ('started', task, {}), + ]) + + def test_revoking_normal(self): + # enqueue 2 normal commands + r1 = modify_state('k', 'v') + r2 = modify_state('k2', 'v2') + + # revoke the first *before it has been checked* + r1.revoke() + self.assertTrue(test_huey.is_revoked(r1.task)) + self.assertFalse(test_huey.is_revoked(r2.task)) + + # dequeue a *single* message (r1) + task = test_huey.dequeue() + self.run_worker(task) + + self.assertEqual(len(test_events._events), 1) + self.assertStatusTask([ + ('revoked', r1.task, {}), + ]) + + # no changes and the task was not added to the schedule + self.assertFalse('k' in state) + + # dequeue a *single* message + task = test_huey.dequeue() + self.run_worker(task) + + self.assertTrue('k2' in state) + + def test_revoking_schedule(self): + global state + dt = datetime.datetime(2011, 1, 1) + dt2 = datetime.datetime(2037, 1, 1) + + r1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False) + r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt, convert_utc=False) + r3 = modify_state.schedule(args=('k3', 'v3'), eta=dt2, convert_utc=False) + r4 = modify_state.schedule(args=('k4', 'v4'), eta=dt2, convert_utc=False) + + # revoke r1 and r3 + r1.revoke() + r3.revoke() + self.assertTrue(test_huey.is_revoked(r1.task)) + self.assertFalse(test_huey.is_revoked(r2.task)) + self.assertTrue(test_huey.is_revoked(r3.task)) + self.assertFalse(test_huey.is_revoked(r4.task)) + + expected = [ + #state, schedule + ({}, 0), + ({'k2': 'v2'}, 0), + ({'k2': 'v2'}, 1), + ({'k2': 'v2'}, 2), + ] + + for i in range(4): + estate, esc = expected[i] + + # dequeue a *single* message + task = test_huey.dequeue() + self.run_worker(task) + + self.assertEqual(state, estate) + self.assertEqual(len(test_huey.schedule._schedule), esc) + + # lets pretend its 2037 + future = dt2 + datetime.timedelta(seconds=1) + self.consumer.scheduler_t.loop(future) + self.assertEqual(len(test_huey.schedule._schedule), 0) + + # There are two tasks in the queue now (r3 and r4) -- process both. + for i in range(2): + task = test_huey.dequeue() + self.run_worker(task, future) + + self.assertEqual(state, {'k2': 'v2', 'k4': 'v4'}) + + def test_revoking_periodic(self): + global state + def loop_periodic(ts): + self.consumer.periodic_t.loop(ts) + for i in range(len(test_queue._queue)): + task = test_huey.dequeue() + self.run_worker(task, ts) + + # revoke the command once + every_hour.revoke(revoke_once=True) + self.assertTrue(every_hour.is_revoked()) + + # it will be skipped the first go-round + dt = datetime.datetime(2011, 1, 1, 0, 0) + loop_periodic(dt) + + # it has not been run + self.assertEqual(state, {}) + + # the next go-round it will be enqueued + loop_periodic(dt) + + # our command was run + self.assertEqual(state, {'p': 'y'}) + + # reset state + state = {} + + # revoke the command + every_hour.revoke() + self.assertTrue(every_hour.is_revoked()) + + # it will no longer be enqueued + loop_periodic(dt) + loop_periodic(dt) + self.assertEqual(state, {}) + + # restore + every_hour.restore() + self.assertFalse(every_hour.is_revoked()) + + # it will now be enqueued + loop_periodic(dt) + self.assertEqual(state, {'p': 'y'}) + + # reset + state = {} + + # revoke for an hour + td = datetime.timedelta(seconds=3600) + every_hour.revoke(revoke_until=dt + td) + + loop_periodic(dt) + self.assertEqual(state, {}) + + # after an hour it is back + loop_periodic(dt + td) + self.assertEqual(state, {'p': 'y'}) + + # our data store should reflect the delay + task_obj = every_hour.task_class() + self.assertEqual(len(test_huey.result_store._results), 1) + self.assertTrue(task_obj.revoke_id in test_huey.result_store._results) diff --git a/lib/huey/tests/crontab.py b/lib/huey/tests/crontab.py new file mode 100644 index 0000000..8c603ab --- /dev/null +++ b/lib/huey/tests/crontab.py @@ -0,0 +1,91 @@ +import datetime +import unittest + +from huey import crontab + + +class CrontabTestCase(unittest.TestCase): + def test_crontab_month(self): + # validates the following months, 1, 4, 7, 8, 9 + valids = [1, 4, 7, 8, 9] + validate_m = crontab(month='1,4,*/6,8-9') + + for x in range(1, 13): + res = validate_m(datetime.datetime(2011, x, 1)) + self.assertEqual(res, x in valids) + + def test_crontab_day(self): + # validates the following days + valids = [1, 4, 7, 8, 9, 13, 19, 25, 31] + validate_d = crontab(day='*/6,1,4,8-9') + + for x in range(1, 32): + res = validate_d(datetime.datetime(2011, 1, x)) + self.assertEqual(res, x in valids) + + def test_crontab_hour(self): + # validates the following hours + valids = [0, 1, 4, 6, 8, 9, 12, 18] + validate_h = crontab(hour='8-9,*/6,1,4') + + for x in range(24): + res = validate_h(datetime.datetime(2011, 1, 1, x)) + self.assertEqual(res, x in valids) + + edge = crontab(hour=0) + self.assertTrue(edge(datetime.datetime(2011, 1, 1, 0, 0))) + self.assertFalse(edge(datetime.datetime(2011, 1, 1, 12, 0))) + + def test_crontab_minute(self): + # validates the following minutes + valids = [0, 1, 4, 6, 8, 9, 12, 18, 24, 30, 36, 42, 48, 54] + validate_m = crontab(minute='4,8-9,*/6,1') + + for x in range(60): + res = validate_m(datetime.datetime(2011, 1, 1, 1, x)) + self.assertEqual(res, x in valids) + + def test_crontab_day_of_week(self): + # validates the following days of week + # jan, 1, 2011 is a saturday + valids = [2, 4, 9, 11, 16, 18, 23, 25, 30] + validate_dow = crontab(day_of_week='0,2') + + for x in range(1, 32): + res = validate_dow(datetime.datetime(2011, 1, x)) + self.assertEqual(res, x in valids) + + def test_crontab_all_together(self): + # jan 1, 2011 is a saturday + # may 1, 2011 is a sunday + validate = crontab( + month='1,5', + day='1,4,7', + day_of_week='0,6', + hour='*/4', + minute='1-5,10-15,50' + ) + + self.assertTrue(validate(datetime.datetime(2011, 5, 1, 4, 11))) + self.assertTrue(validate(datetime.datetime(2011, 5, 7, 20, 50))) + self.assertTrue(validate(datetime.datetime(2011, 1, 1, 0, 1))) + + # fails validation on month + self.assertFalse(validate(datetime.datetime(2011, 6, 4, 4, 11))) + + # fails validation on day + self.assertFalse(validate(datetime.datetime(2011, 1, 6, 4, 11))) + + # fails validation on day_of_week + self.assertFalse(validate(datetime.datetime(2011, 1, 4, 4, 11))) + + # fails validation on hour + self.assertFalse(validate(datetime.datetime(2011, 1, 1, 1, 11))) + + # fails validation on minute + self.assertFalse(validate(datetime.datetime(2011, 1, 1, 4, 6))) + + def test_invalid_crontabs(self): + # check invalid configurations are detected and reported + self.assertRaises(ValueError, crontab, minute='61') + self.assertRaises(ValueError, crontab, minute='0-61') diff --git a/lib/huey/tests/peewee_tests.py b/lib/huey/tests/peewee_tests.py new file mode 100644 index 0000000..9ed66c1 --- /dev/null +++ b/lib/huey/tests/peewee_tests.py @@ -0,0 +1,62 @@ +from contextlib import contextmanager +import unittest + +from huey import Huey +from huey.backends.dummy import DummyDataStore +from huey.backends.dummy import DummyQueue +from huey.backends.dummy import DummySchedule +from huey.peewee_helpers import db_periodic_task +from huey.peewee_helpers import db_task +from peewee import * + + +queue = DummyQueue('test-queue') +schedule = DummySchedule('test-queue') +data_store = DummyDataStore('test-queue') +huey = Huey(queue, data_store, schedule=schedule) + +STATE = [] + +class MockSqliteDatabase(SqliteDatabase): + def record_call(fn): + def inner(*args, **kwargs): + STATE.append(fn.__name__) + return fn(*args, **kwargs) + return inner + connect = record_call(SqliteDatabase.connect) + _close = record_call(SqliteDatabase._close) + transaction = record_call(SqliteDatabase.transaction) + +db = MockSqliteDatabase('test.huey.db') + +class Value(Model): + data = CharField() + + class Meta: + database = db + + @classmethod + def create(cls, *args, **kwargs): + STATE.append('create') + return super(Value, cls).create(*args, **kwargs) + +@db_task(huey, db) +def test_db_task(val): + return Value.create(data=val) + +class TestPeeweeHelpers(unittest.TestCase): + def setUp(self): + global STATE + STATE = [] + queue.flush() + data_store.flush() + schedule.flush() + Value.drop_table(True) + Value.create_table() + + def test_helper(self): + test_db_task('foo') + self.assertEqual(STATE, ['connect']) + huey.execute(huey.dequeue()) + self.assertEqual(STATE, ['connect', 'transaction', 'create', '_close']) + self.assertEqual(Value.select().count(), 1) diff --git a/lib/huey/tests/queue.py b/lib/huey/tests/queue.py new file mode 100644 index 0000000..b66d0ab --- /dev/null +++ b/lib/huey/tests/queue.py @@ -0,0 +1,438 @@ +import datetime +import unittest + +from huey import crontab +from huey import exceptions as huey_exceptions +from huey import Huey +from huey.api import QueueTask +from huey.backends.dummy import DummyDataStore +from huey.backends.dummy import DummyQueue +from huey.backends.dummy import DummySchedule +from huey.registry import registry +from huey.utils import EmptyData +from huey.utils import local_to_utc + + +queue_name = 'test-queue' +queue = DummyQueue(queue_name) +schedule = DummySchedule(queue_name) +huey = Huey(queue, schedule=schedule) + +res_queue_name = 'test-queue-2' +res_queue = DummyQueue(res_queue_name) +res_store = DummyDataStore(res_queue_name) + +res_huey = Huey(res_queue, res_store, schedule) +res_huey_nones = Huey(res_queue, res_store, store_none=True) + +# store some global state +state = {} +last_executed_task_class = [] + +# create a decorated queue command +@huey.task() +def add(key, value): + state[key] = value + +@huey.task(include_task=True) +def self_aware(key, value, task=None): + last_executed_task_class.append(task.__class__.__name__) + +# create a periodic queue command +@huey.periodic_task(crontab(minute='0')) +def add_on_the_hour(): + state['periodic'] = 'x' + +# define a command using the class +class AddTask(QueueTask): + def execute(self): + k, v = self.data + state[k] = v + +# create a command that raises an exception +class BampfException(Exception): + pass + +@huey.task() +def throw_error(): + raise BampfException('bampf') + +@res_huey.task() +def add2(a, b): + return a + b + +@res_huey.periodic_task(crontab(minute='0')) +def add_on_the_hour2(): + state['periodic'] = 'x' + +@res_huey.task() +def returns_none(): + return None + +@res_huey_nones.task() +def returns_none2(): + return None + + +class HueyTestCase(unittest.TestCase): + def setUp(self): + global state + global last_executed_task_class + queue.flush() + res_queue.flush() + schedule.flush() + state = {} + last_executed_task_class = [] + + def test_registration(self): + self.assertTrue('queuecmd_add' in registry) + self.assertTrue('queuecmd_add_on_the_hour' in registry) + self.assertTrue('AddTask' in registry) + + def test_enqueue(self): + # sanity check + self.assertEqual(len(queue), 0) + + # initializing the command does not enqueue it + ac = AddTask(('k', 'v')) + self.assertEqual(len(queue), 0) + + # ok, enqueue it, then check that it was enqueued + huey.enqueue(ac) + self.assertEqual(len(queue), 1) + + # it can be enqueued multiple times + huey.enqueue(ac) + self.assertEqual(len(queue), 2) + + # no changes to state + self.assertFalse('k' in state) + + def test_enqueue_decorator(self): + # sanity check + self.assertEqual(len(queue), 0) + + add('k', 'v') + self.assertEqual(len(queue), 1) + + add('k', 'v') + self.assertEqual(len(queue), 2) + + # no changes to state + self.assertFalse('k' in state) + + def test_schedule(self): + dt = datetime.datetime(2011, 1, 1, 0, 1) + add('k', 'v') + self.assertEqual(len(queue), 1) + + task = huey.dequeue() + self.assertEqual(task.execute_time, None) + + add.schedule(args=('k2', 'v2'), eta=dt) + self.assertEqual(len(queue), 1) + task = huey.dequeue() + self.assertEqual(task.execute_time, local_to_utc(dt)) + + add.schedule(args=('k3', 'v3'), eta=dt, convert_utc=False) + self.assertEqual(len(queue), 1) + task = huey.dequeue() + self.assertEqual(task.execute_time, dt) + + def test_error_raised(self): + throw_error() + + # no error + task = huey.dequeue() + + # error + self.assertRaises(BampfException, huey.execute, task) + + def test_internal_error(self): + """ + Verify that exceptions are wrapped with the special "huey" + exception classes. + """ + class SpecialException(Exception): + pass + + class BrokenQueue(DummyQueue): + def read(self): + raise SpecialException('read error') + + def write(self, data): + raise SpecialException('write error') + + class BrokenDataStore(DummyDataStore): + def get(self, key): + raise SpecialException('get error') + + def put(self, key, value): + raise SpecialException('put error') + + class BrokenSchedule(DummySchedule): + def add(self, data, ts): + raise SpecialException('add error') + + def read(self, ts): + raise SpecialException('read error') + + task = AddTask() + huey = Huey( + BrokenQueue('q'), + BrokenDataStore('q'), + BrokenSchedule('q')) + + self.assertRaises( + huey_exceptions.QueueWriteException, + huey.enqueue, + AddTask()) + self.assertRaises( + huey_exceptions.QueueReadException, + huey.dequeue) + self.assertRaises( + huey_exceptions.DataStorePutException, + huey.revoke, + task) + self.assertRaises( + huey_exceptions.DataStoreGetException, + huey.restore, + task) + self.assertRaises( + huey_exceptions.ScheduleAddException, + huey.add_schedule, + task) + self.assertRaises( + huey_exceptions.ScheduleReadException, + huey.read_schedule, + 1) + + def test_dequeueing(self): + res = huey.dequeue() # no error raised if queue is empty + self.assertEqual(res, None) + + add('k', 'v') + task = huey.dequeue() + + self.assertTrue(isinstance(task, QueueTask)) + self.assertEqual(task.get_data(), (('k', 'v'), {})) + + def test_execution(self): + self.assertFalse('k' in state) + add('k', 'v') + + task = huey.dequeue() + self.assertFalse('k' in state) + + huey.execute(task) + self.assertEqual(state['k'], 'v') + + add('k', 'X') + self.assertEqual(state['k'], 'v') + + huey.execute(huey.dequeue()) + self.assertEqual(state['k'], 'X') + + self.assertRaises(TypeError, huey.execute, huey.dequeue()) + + def test_self_awareness(self): + self_aware('k', 'v') + task = huey.dequeue() + huey.execute(task) + self.assertEqual(last_executed_task_class.pop(), "queuecmd_self_aware") + + self_aware('k', 'v') + huey.execute(huey.dequeue()) + self.assertEqual(last_executed_task_class.pop(), "queuecmd_self_aware") + + add('k', 'x') + huey.execute(huey.dequeue()) + self.assertEqual(len(last_executed_task_class), 0) + + def test_call_local(self): + self.assertEqual(len(queue), 0) + self.assertEqual(state, {}) + add.call_local('nugget', 'green') + + self.assertEqual(len(queue), 0) + self.assertEqual(state['nugget'], 'green') + + def test_revoke(self): + ac = AddTask(('k', 'v')) + ac2 = AddTask(('k2', 'v2')) + ac3 = AddTask(('k3', 'v3')) + + res_huey.enqueue(ac) + res_huey.enqueue(ac2) + res_huey.enqueue(ac3) + res_huey.enqueue(ac2) + res_huey.enqueue(ac) + + self.assertEqual(len(res_queue), 5) + res_huey.revoke(ac2) + + while res_queue: + task = res_huey.dequeue() + if not res_huey.is_revoked(task): + res_huey.execute(task) + + self.assertEqual(state, {'k': 'v', 'k3': 'v3'}) + + def test_revoke_periodic(self): + add_on_the_hour2.revoke() + self.assertTrue(add_on_the_hour2.is_revoked()) + + # it is still revoked + self.assertTrue(add_on_the_hour2.is_revoked()) + + add_on_the_hour2.restore() + self.assertFalse(add_on_the_hour2.is_revoked()) + + add_on_the_hour2.revoke(revoke_once=True) + self.assertTrue(add_on_the_hour2.is_revoked()) # it is revoked once, but we are preserving that state + self.assertTrue(add_on_the_hour2.is_revoked(peek=False)) # is revoked once, but clear state + self.assertFalse(add_on_the_hour2.is_revoked()) # no longer revoked + + d = datetime.datetime + add_on_the_hour2.revoke(revoke_until=d(2011, 1, 1, 11, 0)) + self.assertTrue(add_on_the_hour2.is_revoked(dt=d(2011, 1, 1, 10, 0))) + self.assertTrue(add_on_the_hour2.is_revoked(dt=d(2011, 1, 1, 10, 59))) + self.assertFalse(add_on_the_hour2.is_revoked(dt=d(2011, 1, 1, 11, 0))) + + add_on_the_hour2.restore() + self.assertFalse(add_on_the_hour2.is_revoked()) + + def test_result_store(self): + res = add2(1, 2) + res2 = add2(4, 5) + res3 = add2(0, 0) + + # none have been executed as yet + self.assertEqual(res.get(), None) + self.assertEqual(res2.get(), None) + self.assertEqual(res3.get(), None) + + # execute the first task + res_huey.execute(res_huey.dequeue()) + self.assertEqual(res.get(), 3) + self.assertEqual(res2.get(), None) + self.assertEqual(res3.get(), None) + + # execute the second task + res_huey.execute(res_huey.dequeue()) + self.assertEqual(res.get(), 3) + self.assertEqual(res2.get(), 9) + self.assertEqual(res3.get(), None) + + # execute the 3rd, which returns a zero value + res_huey.execute(res_huey.dequeue()) + self.assertEqual(res.get(), 3) + self.assertEqual(res2.get(), 9) + self.assertEqual(res3.get(), 0) + + # check that it returns None when nothing is present + res = returns_none() + self.assertEqual(res.get(), None) + + # execute, it will still return None, but underneath it is an EmptyResult + # indicating its actual result was not persisted + res_huey.execute(res_huey.dequeue()) + self.assertEqual(res.get(), None) + self.assertEqual(res._result, EmptyData) + + # execute again, this time note that we're pointing at the invoker + # that *does* accept None as a store-able result + res = returns_none2() + self.assertEqual(res.get(), None) + + # it stores None + res_huey_nones.execute(res_huey_nones.dequeue()) + self.assertEqual(res.get(), None) + self.assertEqual(res._result, None) + + def test_task_store(self): + dt1 = datetime.datetime(2011, 1, 1, 0, 0) + dt2 = datetime.datetime(2035, 1, 1, 0, 0) + + add2.schedule(args=('k', 'v'), eta=dt1, convert_utc=False) + task1 = res_huey.dequeue() + + add2.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False) + task2 = res_huey.dequeue() + + add2('k3', 'v3') + task3 = res_huey.dequeue() + + # add the command to the schedule + res_huey.add_schedule(task1) + self.assertEqual(len(res_huey.schedule._schedule), 1) + + # add a future-dated command + res_huey.add_schedule(task2) + self.assertEqual(len(res_huey.schedule._schedule), 2) + + res_huey.add_schedule(task3) + + tasks = res_huey.read_schedule(dt1) + self.assertEqual(tasks, [task3, task1]) + + tasks = res_huey.read_schedule(dt1) + self.assertEqual(tasks, []) + + tasks = res_huey.read_schedule(dt2) + self.assertEqual(tasks, [task2]) + + def test_ready_to_run_method(self): + dt1 = datetime.datetime(2011, 1, 1, 0, 0) + dt2 = datetime.datetime(2035, 1, 1, 0, 0) + + add2.schedule(args=('k', 'v'), eta=dt1) + task1 = res_huey.dequeue() + + add2.schedule(args=('k2', 'v2'), eta=dt2) + task2 = res_huey.dequeue() + + add2('k3', 'v3') + task3 = res_huey.dequeue() + + add2.schedule(args=('k4', 'v4'), task_id='test_task_id') + task4 = res_huey.dequeue() + + # sanity check what should be run + self.assertTrue(res_huey.ready_to_run(task1)) + self.assertFalse(res_huey.ready_to_run(task2)) + self.assertTrue(res_huey.ready_to_run(task3)) + self.assertTrue(res_huey.ready_to_run(task4)) + self.assertEqual('test_task_id', task4.task_id) + + def test_task_delay(self): + curr = datetime.datetime.utcnow() + curr50 = curr + datetime.timedelta(seconds=50) + curr70 = curr + datetime.timedelta(seconds=70) + + add2.schedule(args=('k', 'v'), delay=60) + task1 = res_huey.dequeue() + + add2.schedule(args=('k2', 'v2'), delay=600) + task2 = res_huey.dequeue() + + add2('k3', 'v3') + task3 = res_huey.dequeue() + + # add the command to the schedule + res_huey.add_schedule(task1) + res_huey.add_schedule(task2) + res_huey.add_schedule(task3) + + # sanity check what should be run + self.assertFalse(res_huey.ready_to_run(task1)) + self.assertFalse(res_huey.ready_to_run(task2)) + self.assertTrue(res_huey.ready_to_run(task3)) + + self.assertFalse(res_huey.ready_to_run(task1, curr50)) + self.assertFalse(res_huey.ready_to_run(task2, curr50)) + self.assertTrue(res_huey.ready_to_run(task3, curr50)) + + self.assertTrue(res_huey.ready_to_run(task1, curr70)) + self.assertFalse(res_huey.ready_to_run(task2, curr70)) + self.assertTrue(res_huey.ready_to_run(task3, curr70)) diff --git a/lib/huey/tests/utils.py b/lib/huey/tests/utils.py new file mode 100644 index 0000000..3489932 --- /dev/null +++ b/lib/huey/tests/utils.py @@ -0,0 +1,24 @@ +import unittest + +from huey.utils import wrap_exception + + +class MyException(Exception): + pass + + +class TestWrapException(unittest.TestCase): + def test_wrap_exception(self): + def raise_keyerror(): + try: + {}['huey'] + except KeyError as exc: + raise wrap_exception(MyException) + + self.assertRaises(MyException, raise_keyerror) + try: + raise_keyerror() + except MyException as exc: + self.assertEqual(str(exc), "KeyError: 'huey'") + else: + assert False diff --git a/lib/huey/utils.py b/lib/huey/utils.py new file mode 100644 index 0000000..747d8da --- /dev/null +++ b/lib/huey/utils.py @@ -0,0 +1,21 @@ +import datetime +import sys +import time + + +class EmptyData(object): + pass + + +def load_class(s): + path, klass = s.rsplit('.', 1) + __import__(path) + mod = sys.modules[path] + return getattr(mod, klass) + +def wrap_exception(new_exc_class): + exc_class, exc, tb = sys.exc_info() + raise new_exc_class('%s: %s' % (exc_class.__name__, exc)) + +def local_to_utc(dt): + return datetime.datetime(*time.gmtime(time.mktime(dt.timetuple()))[:6]) diff --git a/rpm/build.sh b/rpm/build.sh new file mode 100755 index 0000000..ee20430 --- /dev/null +++ b/rpm/build.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +set -ex +rm -rf ./tis-tisbackup/ ./BUILD *.rpm ./RPMS +mkdir -p BUILD RPMS + +VERSION=`git rev-list HEAD --count` +echo $VERSION > __VERSION__ + +rpmbuild -bb --buildroot $PWD/builddir -v --clean tis-tisbackup.spec +cp RPMS/*/*.rpm . + diff --git a/rpm/tis-tisbackup.spec b/rpm/tis-tisbackup.spec new file mode 100644 index 0000000..2feb3a1 --- /dev/null +++ b/rpm/tis-tisbackup.spec @@ -0,0 +1,54 @@ +%define _topdir . +%define buildroot ./builddir +%define VERSION %(cat __VERSION__) + +Name: tis-tisbackup +Version: %{VERSION} +Release: 1%{?dist} +Summary: TisBackup backup manager +BuildArch: x86_64 + +Group: System Environment/Daemons +License: GPL +URL: http://dev.tranquil.it +Source0: ../ +Prefix: / + +Requires: unzip rsync python-paramiko python-pyvmomi python-pip nfs-utils python-flask python-setuptools python-simplejson autofs pexpect + +# Turn off the brp-python-bytecompile script +#%global __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g') + +%description + +%install +set -ex + +mkdir -p %{buildroot}/opt/tisbackup/ +mkdir -p %{buildroot}/usr/lib/systemd/system/ +mkdir -p %{buildroot}/etc/cron.d/ +mkdir -p %{buildroot}/etc/tis + +rsync --exclude="rpm" --exclude=".git" -aP ../../../tisbackup/ %{buildroot}/opt/tisbackup/ +rsync -aP ../../../tisbackup/scripts/tisbackup_gui.service %{buildroot}/usr/lib/systemd/system/ +rsync -aP ../../../tisbackup/scripts/tisbackup_huey.service %{buildroot}/usr/lib/systemd/system/ +rsync -aP ../../../tisbackup/samples/tisbackup.cron %{buildroot}/etc/cron.d/ +rsync -aP ../../../tisbackup/samples/tisbackup_gui.ini %{buildroot}/etc/tis +rsync -aP ../../../tisbackup/samples/config.ini.sample %{buildroot}/etc/tis/tisbackup-config.ini.sample + +%files +%defattr(-,root,root) +%attr(-,root,root)/opt/tisbackup/ +%attr(-,root,root)/usr/lib/systemd/system/ +%attr(-,root,root)/etc/tis +%attr(-,root,root)/etc/cron.d/ + +%pre + + +%post +#[ -f /etc/dhcp/reservations.conf ] || touch /etc/dhcp/reservations.conf +#[ -f /etc/dhcp/reservations.conf.disabled ] || touch /etc/dhcp/reservations.conf.disabled +#[ -f /opt/tis-dhcpmanager/config.ini ] || cp /opt/tis-dhcpmanager/config.ini.sample /opt/tis-dhcpmanager/config.ini +#[ -f /etc/dhcp/reservations.conf ] && sed -i 's/{/{\n/;s/;/;\n/g;' /etc/dhcp/reservations.conf && sed -i '/^$/d' /etc/dhcp/reservations.conf + diff --git a/scripts/tisbackup_gui.service b/scripts/tisbackup_gui.service new file mode 100644 index 0000000..fb76968 --- /dev/null +++ b/scripts/tisbackup_gui.service @@ -0,0 +1,9 @@ +[Unit] +Description=tisbackup + +[Service] +Type=simple +ExecStart=/usr/bin/python2 /opt/tisbackup/tisbackup_gui.py + +[Install] +WantedBy=multi-user.target diff --git a/scripts/tisbackup_huey.service b/scripts/tisbackup_huey.service new file mode 100644 index 0000000..54cde81 --- /dev/null +++ b/scripts/tisbackup_huey.service @@ -0,0 +1,9 @@ +[Unit] +Description=tisbackup + +[Service] +Type=simple +ExecStart=/opt/tisbackup/huey_consumer.py -n tisbackup_gui.huey +WorkingDirectory=/opt/tisbackup +[Install] +WantedBy=multi-user.target