TISbackup/lib/huey/tests/backends.py

171 lines
5.9 KiB
Python

from collections import deque
import datetime
import os
import sys
import tempfile
import unittest
from huey.api import Huey
from huey.backends.dummy import DummyDataStore
from huey.backends.dummy import DummyEventEmitter
from huey.backends.dummy import DummyQueue
from huey.backends.dummy import DummySchedule
from huey.utils import EmptyData
from huey.backends.sqlite_backend import SqliteDataStore
from huey.backends.sqlite_backend import SqliteQueue
from huey.backends.sqlite_backend import SqliteSchedule
try:
from huey.backends.redis_backend import RedisDataStore
from huey.backends.redis_backend import RedisEventEmitter
from huey.backends.redis_backend import RedisQueue
from huey.backends.redis_backend import RedisSchedule
except ImportError:
RedisQueue = RedisDataStore = RedisSchedule = RedisEventEmitter = None
try:
from huey.backends.rabbitmq_backend import RabbitQueue, RabbitEventEmitter
except ImportError:
RabbitQueue = RabbitEventEmitter = None
if sys.version_info[0] == 2:
redis_kwargs = {}
else:
redis_kwargs = {'decode_responses': True}
QUEUES = (DummyQueue, RedisQueue, SqliteQueue, RabbitQueue)
DATA_STORES = (DummyDataStore, RedisDataStore, SqliteDataStore, None)
SCHEDULES = (DummySchedule, RedisSchedule, SqliteSchedule, None)
EVENTS = (DummyEventEmitter, RedisEventEmitter, None, RabbitEventEmitter)
class HueyBackendTestCase(unittest.TestCase):
def setUp(self):
self.sqlite_location = tempfile.mkstemp(prefix='hueytest.')[1]
def tearDown(self):
os.unlink(self.sqlite_location)
def test_queues(self):
result_store = DummyDataStore('dummy')
for q in QUEUES:
if not q:
continue
if issubclass(q, SqliteQueue):
queue = q('test', location=self.sqlite_location)
elif issubclass(q, RedisQueue):
queue = q('test', **redis_kwargs)
else:
queue = q('test')
queue.flush()
queue.write('a')
queue.write('b')
self.assertEqual(len(queue), 2)
self.assertEqual(queue.read(), 'a')
self.assertEqual(queue.read(), 'b')
self.assertEqual(queue.read(), None)
queue.write('c')
queue.write('d')
queue.write('c')
queue.write('x')
queue.write('d')
self.assertEqual(len(queue), 5)
self.assertEqual(queue.remove('c'), 2)
self.assertEqual(len(queue), 3)
self.assertEqual(queue.read(), 'd')
self.assertEqual(queue.read(), 'x')
self.assertEqual(queue.read(), 'd')
queue.flush()
test_huey = Huey(queue, result_store)
@test_huey.task()
def test_queues_add(k, v):
return k + v
res = test_queues_add('k', 'v')
self.assertEqual(len(queue), 1)
task = test_huey.dequeue()
test_huey.execute(task)
self.assertEqual(res.get(), 'kv')
res = test_queues_add('\xce', '\xcf')
task = test_huey.dequeue()
test_huey.execute(task)
self.assertEqual(res.get(), '\xce\xcf')
def test_data_stores(self):
for d in DATA_STORES:
if not d:
continue
if issubclass(d, SqliteDataStore):
data_store = d('test', location=self.sqlite_location)
elif issubclass(d, RedisDataStore):
data_store = d('test', **redis_kwargs)
else:
data_store = d('test')
data_store.put('k1', 'v1')
data_store.put('k2', 'v2')
data_store.put('k3', 'v3')
self.assertEqual(data_store.peek('k2'), 'v2')
self.assertEqual(data_store.get('k2'), 'v2')
self.assertEqual(data_store.peek('k2'), EmptyData)
self.assertEqual(data_store.get('k2'), EmptyData)
self.assertEqual(data_store.peek('k3'), 'v3')
data_store.put('k3', 'v3-2')
self.assertEqual(data_store.peek('k3'), 'v3-2')
def test_schedules(self):
for s in SCHEDULES:
if not s:
continue
if issubclass(s, SqliteSchedule):
schedule = s('test', location=self.sqlite_location)
elif issubclass(s, RedisSchedule):
schedule = s('test', **redis_kwargs)
else:
schedule = s('test')
dt1 = datetime.datetime(2013, 1, 1, 0, 0)
dt2 = datetime.datetime(2013, 1, 2, 0, 0)
dt3 = datetime.datetime(2013, 1, 3, 0, 0)
dt4 = datetime.datetime(2013, 1, 4, 0, 0)
# Add to schedule out-of-order to ensure sorting is performed by
# the schedule.
schedule.add('s2', dt2)
schedule.add('s1', dt1)
schedule.add('s4', dt4)
schedule.add('s3', dt3)
# Ensure that asking for a timestamp previous to any item in the
# schedule returns empty list.
self.assertEqual(
schedule.read(dt1 - datetime.timedelta(days=1)),
[])
# Ensure the upper boundary is inclusive of whatever timestamp
# is passed in.
self.assertEqual(schedule.read(dt3), ['s1', 's2', 's3'])
self.assertEqual(schedule.read(dt3), [])
# Ensure the schedule is flushed and an empty schedule returns an
# empty list.
self.assertEqual(schedule.read(dt4), ['s4'])
self.assertEqual(schedule.read(dt4), [])
def test_events(self):
for e in EVENTS:
if not e:
continue
e = e('test')
messages = ['a', 'b', 'c', 'd']
for message in messages:
e.emit(message)
if hasattr(e, '_events'):
self.assertEqual(e._events, deque(['d', 'c', 'b', 'a']))