442 lines
13 KiB
Python
442 lines
13 KiB
Python
|
from collections import deque
|
||
|
import datetime
|
||
|
import json
|
||
|
import logging
|
||
|
import threading
|
||
|
import time
|
||
|
import unittest
|
||
|
|
||
|
from huey import crontab
|
||
|
from huey import Huey
|
||
|
from huey.backends.dummy import DummyDataStore
|
||
|
from huey.backends.dummy import DummyEventEmitter
|
||
|
from huey.backends.dummy import DummyQueue
|
||
|
from huey.backends.dummy import DummySchedule
|
||
|
from huey.consumer import Consumer
|
||
|
from huey.consumer import WorkerThread
|
||
|
from huey.registry import registry
|
||
|
|
||
|
# Logger used by the consumer.
|
||
|
logger = logging.getLogger('huey.consumer')
|
||
|
|
||
|
# Store some global state.
|
||
|
state = {}
|
||
|
|
||
|
# Create a queue, result store, schedule and event emitter, then attach them
|
||
|
# to a test-only Huey instance.
|
||
|
test_queue = DummyQueue('test-queue')
|
||
|
test_result_store = DummyDataStore('test-queue')
|
||
|
test_schedule = DummySchedule('test-queue')
|
||
|
test_events = DummyEventEmitter('test-queue')
|
||
|
test_huey = Huey(test_queue, test_result_store, test_schedule, test_events)
|
||
|
|
||
|
# Create some test tasks.
|
||
|
@test_huey.task()
|
||
|
def modify_state(k, v):
|
||
|
state[k] = v
|
||
|
return v
|
||
|
|
||
|
@test_huey.task()
|
||
|
def blow_up():
|
||
|
raise Exception('blowed up')
|
||
|
|
||
|
@test_huey.task(retries=3)
|
||
|
def retry_command(k, always_fail=True):
|
||
|
if k not in state:
|
||
|
if not always_fail:
|
||
|
state[k] = 'fixed'
|
||
|
raise Exception('fappsk')
|
||
|
return state[k]
|
||
|
|
||
|
@test_huey.task(retries=3, retry_delay=10)
|
||
|
def retry_command_slow(k, always_fail=True):
|
||
|
if k not in state:
|
||
|
if not always_fail:
|
||
|
state[k] = 'fixed'
|
||
|
raise Exception('fappsk')
|
||
|
return state[k]
|
||
|
|
||
|
@test_huey.periodic_task(crontab(minute='0'))
|
||
|
def every_hour():
|
||
|
state['p'] = 'y'
|
||
|
|
||
|
|
||
|
# Create a log handler that will track messages generated by the consumer.
|
||
|
class TestLogHandler(logging.Handler):
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
self.messages = []
|
||
|
logging.Handler.__init__(self, *args, **kwargs)
|
||
|
|
||
|
def emit(self, record):
|
||
|
self.messages.append(record.getMessage())
|
||
|
|
||
|
|
||
|
class ConsumerTestCase(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
global state
|
||
|
state = {}
|
||
|
|
||
|
self.orig_pc = registry._periodic_tasks
|
||
|
registry._periodic_commands = [every_hour.task_class()]
|
||
|
|
||
|
self.orig_sleep = time.sleep
|
||
|
time.sleep = lambda x: None
|
||
|
|
||
|
test_huey.queue.flush()
|
||
|
test_huey.result_store.flush()
|
||
|
test_huey.schedule.flush()
|
||
|
test_events._events = deque()
|
||
|
|
||
|
self.consumer = Consumer(test_huey, workers=2)
|
||
|
self.consumer._create_threads()
|
||
|
|
||
|
self.handler = TestLogHandler()
|
||
|
logger.addHandler(self.handler)
|
||
|
logger.setLevel(logging.INFO)
|
||
|
|
||
|
def tearDown(self):
|
||
|
self.consumer.shutdown()
|
||
|
logger.removeHandler(self.handler)
|
||
|
registry._periodic_tasks = self.orig_pc
|
||
|
time.sleep = self.orig_sleep
|
||
|
|
||
|
def assertStatusTask(self, status_task):
|
||
|
parsed = []
|
||
|
i = 0
|
||
|
while i < len(status_task):
|
||
|
event = json.loads(test_events._events[i])
|
||
|
status, task, extra = status_task[i]
|
||
|
self.assertEqual(event['status'], status)
|
||
|
self.assertEqual(event['id'], task.task_id)
|
||
|
for k, v in extra.items():
|
||
|
self.assertEqual(event[k], v)
|
||
|
i += 1
|
||
|
|
||
|
def spawn(self, func, *args, **kwargs):
|
||
|
t = threading.Thread(target=func, args=args, kwargs=kwargs)
|
||
|
t.start()
|
||
|
return t
|
||
|
|
||
|
def run_worker(self, task, ts=None):
|
||
|
worker_t = WorkerThread(
|
||
|
test_huey,
|
||
|
self.consumer.default_delay,
|
||
|
self.consumer.max_delay,
|
||
|
self.consumer.backoff,
|
||
|
self.consumer.utc,
|
||
|
self.consumer._shutdown)
|
||
|
ts = ts or datetime.datetime.utcnow()
|
||
|
worker_t.handle_task(task, ts)
|
||
|
|
||
|
def test_message_processing(self):
|
||
|
self.consumer.worker_threads[0].start()
|
||
|
|
||
|
self.assertFalse('k' in state)
|
||
|
|
||
|
res = modify_state('k', 'v')
|
||
|
res.get(blocking=True)
|
||
|
|
||
|
self.assertTrue('k' in state)
|
||
|
self.assertEqual(res.get(), 'v')
|
||
|
|
||
|
self.assertEqual(len(test_events._events), 2)
|
||
|
self.assertStatusTask([
|
||
|
('finished', res.task, {}),
|
||
|
('started', res.task, {}),
|
||
|
])
|
||
|
|
||
|
def test_worker(self):
|
||
|
modify_state('k', 'w')
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task)
|
||
|
self.assertEqual(state, {'k': 'w'})
|
||
|
|
||
|
def test_worker_exception(self):
|
||
|
blow_up()
|
||
|
task = test_huey.dequeue()
|
||
|
|
||
|
self.run_worker(task)
|
||
|
self.assertTrue(
|
||
|
'Unhandled exception in worker thread' in self.handler.messages)
|
||
|
|
||
|
self.assertEqual(len(test_events._events), 2)
|
||
|
self.assertStatusTask([
|
||
|
('error', task, {'error': True}),
|
||
|
('started', task, {}),
|
||
|
])
|
||
|
|
||
|
def test_retries_and_logging(self):
|
||
|
# this will continually fail
|
||
|
retry_command('blampf')
|
||
|
|
||
|
for i in reversed(range(4)):
|
||
|
task = test_huey.dequeue()
|
||
|
self.assertEqual(task.retries, i)
|
||
|
self.run_worker(task)
|
||
|
if i > 0:
|
||
|
self.assertEqual(
|
||
|
self.handler.messages[-1],
|
||
|
'Re-enqueueing task %s, %s tries left' % (
|
||
|
task.task_id, i - 1))
|
||
|
self.assertStatusTask([
|
||
|
('enqueued', task, {}),
|
||
|
('retrying', task, {}),
|
||
|
('error', task,{}),
|
||
|
('started', task, {}),
|
||
|
])
|
||
|
last_idx = -2
|
||
|
else:
|
||
|
self.assertStatusTask([
|
||
|
('error', task,{}),
|
||
|
('started', task, {}),
|
||
|
])
|
||
|
last_idx = -1
|
||
|
self.assertEqual(self.handler.messages[last_idx],
|
||
|
'Unhandled exception in worker thread')
|
||
|
|
||
|
self.assertEqual(test_huey.dequeue(), None)
|
||
|
|
||
|
def test_retries_with_success(self):
|
||
|
# this will fail once, then succeed
|
||
|
retry_command('blampf', False)
|
||
|
self.assertFalse('blampf' in state)
|
||
|
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task)
|
||
|
self.assertEqual(self.handler.messages, [
|
||
|
'Executing %s' % task,
|
||
|
'Unhandled exception in worker thread',
|
||
|
'Re-enqueueing task %s, 2 tries left' % task.task_id])
|
||
|
|
||
|
task = test_huey.dequeue()
|
||
|
self.assertEqual(task.retries, 2)
|
||
|
self.run_worker(task)
|
||
|
|
||
|
self.assertEqual(state['blampf'], 'fixed')
|
||
|
self.assertEqual(test_huey.dequeue(), None)
|
||
|
|
||
|
self.assertStatusTask([
|
||
|
('finished', task, {}),
|
||
|
('started', task, {}),
|
||
|
('enqueued', task, {'retries': 2}),
|
||
|
('retrying', task, {'retries': 3}),
|
||
|
('error', task, {'error': True}),
|
||
|
('started', task, {}),
|
||
|
])
|
||
|
|
||
|
def test_scheduling(self):
|
||
|
dt = datetime.datetime(2011, 1, 1, 0, 0)
|
||
|
dt2 = datetime.datetime(2037, 1, 1, 0, 0)
|
||
|
ad1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
|
||
|
ad2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False)
|
||
|
|
||
|
# dequeue the past-timestamped task and run it.
|
||
|
worker = self.consumer.worker_threads[0]
|
||
|
worker.check_message()
|
||
|
|
||
|
self.assertTrue('k' in state)
|
||
|
|
||
|
# dequeue the future-timestamped task.
|
||
|
worker.check_message()
|
||
|
|
||
|
# verify the task got stored in the schedule instead of executing
|
||
|
self.assertFalse('k2' in state)
|
||
|
|
||
|
self.assertStatusTask([
|
||
|
('scheduled', ad2.task, {}),
|
||
|
('finished', ad1.task, {}),
|
||
|
('started', ad1.task, {}),
|
||
|
])
|
||
|
|
||
|
# run through an iteration of the scheduler
|
||
|
self.consumer.scheduler_t.loop(dt)
|
||
|
|
||
|
# our command was not enqueued and no events were emitted.
|
||
|
self.assertEqual(len(test_queue._queue), 0)
|
||
|
self.assertEqual(len(test_events._events), 3)
|
||
|
|
||
|
# run through an iteration of the scheduler
|
||
|
self.consumer.scheduler_t.loop(dt2)
|
||
|
|
||
|
# our command was enqueued
|
||
|
self.assertEqual(len(test_queue._queue), 1)
|
||
|
self.assertEqual(len(test_events._events), 4)
|
||
|
self.assertStatusTask([
|
||
|
('enqueued', ad2.task, {}),
|
||
|
])
|
||
|
|
||
|
def test_retry_scheduling(self):
|
||
|
# this will continually fail
|
||
|
retry_command_slow('blampf')
|
||
|
cur_time = datetime.datetime.utcnow()
|
||
|
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task, ts=cur_time)
|
||
|
self.assertEqual(self.handler.messages, [
|
||
|
'Executing %s' % task,
|
||
|
'Unhandled exception in worker thread',
|
||
|
'Re-enqueueing task %s, 2 tries left' % task.task_id,
|
||
|
])
|
||
|
|
||
|
in_11 = cur_time + datetime.timedelta(seconds=11)
|
||
|
tasks_from_sched = test_huey.read_schedule(in_11)
|
||
|
self.assertEqual(tasks_from_sched, [task])
|
||
|
|
||
|
task = tasks_from_sched[0]
|
||
|
self.assertEqual(task.retries, 2)
|
||
|
exec_time = task.execute_time
|
||
|
|
||
|
self.assertEqual((exec_time - cur_time).seconds, 10)
|
||
|
self.assertStatusTask([
|
||
|
('scheduled', task, {
|
||
|
'retries': 2,
|
||
|
'retry_delay': 10,
|
||
|
'execute_time': time.mktime(exec_time.timetuple())}),
|
||
|
('retrying', task, {
|
||
|
'retries': 3,
|
||
|
'retry_delay': 10,
|
||
|
'execute_time': None}),
|
||
|
('error', task, {}),
|
||
|
('started', task, {}),
|
||
|
])
|
||
|
|
||
|
def test_revoking_normal(self):
|
||
|
# enqueue 2 normal commands
|
||
|
r1 = modify_state('k', 'v')
|
||
|
r2 = modify_state('k2', 'v2')
|
||
|
|
||
|
# revoke the first *before it has been checked*
|
||
|
r1.revoke()
|
||
|
self.assertTrue(test_huey.is_revoked(r1.task))
|
||
|
self.assertFalse(test_huey.is_revoked(r2.task))
|
||
|
|
||
|
# dequeue a *single* message (r1)
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task)
|
||
|
|
||
|
self.assertEqual(len(test_events._events), 1)
|
||
|
self.assertStatusTask([
|
||
|
('revoked', r1.task, {}),
|
||
|
])
|
||
|
|
||
|
# no changes and the task was not added to the schedule
|
||
|
self.assertFalse('k' in state)
|
||
|
|
||
|
# dequeue a *single* message
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task)
|
||
|
|
||
|
self.assertTrue('k2' in state)
|
||
|
|
||
|
def test_revoking_schedule(self):
|
||
|
global state
|
||
|
dt = datetime.datetime(2011, 1, 1)
|
||
|
dt2 = datetime.datetime(2037, 1, 1)
|
||
|
|
||
|
r1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
|
||
|
r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt, convert_utc=False)
|
||
|
r3 = modify_state.schedule(args=('k3', 'v3'), eta=dt2, convert_utc=False)
|
||
|
r4 = modify_state.schedule(args=('k4', 'v4'), eta=dt2, convert_utc=False)
|
||
|
|
||
|
# revoke r1 and r3
|
||
|
r1.revoke()
|
||
|
r3.revoke()
|
||
|
self.assertTrue(test_huey.is_revoked(r1.task))
|
||
|
self.assertFalse(test_huey.is_revoked(r2.task))
|
||
|
self.assertTrue(test_huey.is_revoked(r3.task))
|
||
|
self.assertFalse(test_huey.is_revoked(r4.task))
|
||
|
|
||
|
expected = [
|
||
|
#state, schedule
|
||
|
({}, 0),
|
||
|
({'k2': 'v2'}, 0),
|
||
|
({'k2': 'v2'}, 1),
|
||
|
({'k2': 'v2'}, 2),
|
||
|
]
|
||
|
|
||
|
for i in range(4):
|
||
|
estate, esc = expected[i]
|
||
|
|
||
|
# dequeue a *single* message
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task)
|
||
|
|
||
|
self.assertEqual(state, estate)
|
||
|
self.assertEqual(len(test_huey.schedule._schedule), esc)
|
||
|
|
||
|
# lets pretend its 2037
|
||
|
future = dt2 + datetime.timedelta(seconds=1)
|
||
|
self.consumer.scheduler_t.loop(future)
|
||
|
self.assertEqual(len(test_huey.schedule._schedule), 0)
|
||
|
|
||
|
# There are two tasks in the queue now (r3 and r4) -- process both.
|
||
|
for i in range(2):
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task, future)
|
||
|
|
||
|
self.assertEqual(state, {'k2': 'v2', 'k4': 'v4'})
|
||
|
|
||
|
def test_revoking_periodic(self):
|
||
|
global state
|
||
|
def loop_periodic(ts):
|
||
|
self.consumer.periodic_t.loop(ts)
|
||
|
for i in range(len(test_queue._queue)):
|
||
|
task = test_huey.dequeue()
|
||
|
self.run_worker(task, ts)
|
||
|
|
||
|
# revoke the command once
|
||
|
every_hour.revoke(revoke_once=True)
|
||
|
self.assertTrue(every_hour.is_revoked())
|
||
|
|
||
|
# it will be skipped the first go-round
|
||
|
dt = datetime.datetime(2011, 1, 1, 0, 0)
|
||
|
loop_periodic(dt)
|
||
|
|
||
|
# it has not been run
|
||
|
self.assertEqual(state, {})
|
||
|
|
||
|
# the next go-round it will be enqueued
|
||
|
loop_periodic(dt)
|
||
|
|
||
|
# our command was run
|
||
|
self.assertEqual(state, {'p': 'y'})
|
||
|
|
||
|
# reset state
|
||
|
state = {}
|
||
|
|
||
|
# revoke the command
|
||
|
every_hour.revoke()
|
||
|
self.assertTrue(every_hour.is_revoked())
|
||
|
|
||
|
# it will no longer be enqueued
|
||
|
loop_periodic(dt)
|
||
|
loop_periodic(dt)
|
||
|
self.assertEqual(state, {})
|
||
|
|
||
|
# restore
|
||
|
every_hour.restore()
|
||
|
self.assertFalse(every_hour.is_revoked())
|
||
|
|
||
|
# it will now be enqueued
|
||
|
loop_periodic(dt)
|
||
|
self.assertEqual(state, {'p': 'y'})
|
||
|
|
||
|
# reset
|
||
|
state = {}
|
||
|
|
||
|
# revoke for an hour
|
||
|
td = datetime.timedelta(seconds=3600)
|
||
|
every_hour.revoke(revoke_until=dt + td)
|
||
|
|
||
|
loop_periodic(dt)
|
||
|
self.assertEqual(state, {})
|
||
|
|
||
|
# after an hour it is back
|
||
|
loop_periodic(dt + td)
|
||
|
self.assertEqual(state, {'p': 'y'})
|
||
|
|
||
|
# our data store should reflect the delay
|
||
|
task_obj = every_hour.task_class()
|
||
|
self.assertEqual(len(test_huey.result_store._results), 1)
|
||
|
self.assertTrue(task_obj.revoke_id in test_huey.result_store._results)
|