154 lines
4.4 KiB
Python
154 lines
4.4 KiB
Python
import re
|
|
import time
|
|
|
|
import redis
|
|
from redis.exceptions import ConnectionError
|
|
|
|
from huey.backends.base import BaseDataStore
|
|
from huey.backends.base import BaseEventEmitter
|
|
from huey.backends.base import BaseQueue
|
|
from huey.backends.base import BaseSchedule
|
|
from huey.utils import EmptyData
|
|
|
|
|
|
def clean_name(name):
|
|
return re.sub('[^a-z0-9]', '', name)
|
|
|
|
|
|
class RedisQueue(BaseQueue):
|
|
"""
|
|
A simple Queue that uses the redis to store messages
|
|
"""
|
|
def __init__(self, name, **connection):
|
|
"""
|
|
connection = {
|
|
'host': 'localhost',
|
|
'port': 6379,
|
|
'db': 0,
|
|
}
|
|
"""
|
|
super(RedisQueue, self).__init__(name, **connection)
|
|
|
|
self.queue_name = 'huey.redis.%s' % clean_name(name)
|
|
self.conn = redis.Redis(**connection)
|
|
|
|
def write(self, data):
|
|
self.conn.lpush(self.queue_name, data)
|
|
|
|
def read(self):
|
|
return self.conn.rpop(self.queue_name)
|
|
|
|
def remove(self, data):
|
|
return self.conn.lrem(self.queue_name, data)
|
|
|
|
def flush(self):
|
|
self.conn.delete(self.queue_name)
|
|
|
|
def __len__(self):
|
|
return self.conn.llen(self.queue_name)
|
|
|
|
|
|
class RedisBlockingQueue(RedisQueue):
|
|
"""
|
|
Use the blocking right pop, should result in messages getting
|
|
executed close to immediately by the consumer as opposed to
|
|
being polled for
|
|
"""
|
|
blocking = True
|
|
|
|
def __init__(self, name, read_timeout=None, **connection):
|
|
"""
|
|
connection = {
|
|
'host': 'localhost',
|
|
'port': 6379,
|
|
'db': 0,
|
|
}
|
|
"""
|
|
super(RedisBlockingQueue, self).__init__(name, **connection)
|
|
self.read_timeout = read_timeout
|
|
|
|
def read(self):
|
|
try:
|
|
return self.conn.brpop(
|
|
self.queue_name,
|
|
timeout=self.read_timeout)[1]
|
|
except (ConnectionError, TypeError, IndexError):
|
|
# unfortunately, there is no way to differentiate a socket timing
|
|
# out and a host being unreachable
|
|
return None
|
|
|
|
|
|
# a custom lua script to pass to redis that will read tasks from the schedule
|
|
# and atomically pop them from the sorted set and return them.
|
|
# it won't return anything if it isn't able to remove the items it reads.
|
|
SCHEDULE_POP_LUA = """
|
|
local key = KEYS[1]
|
|
local unix_ts = ARGV[1]
|
|
local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
|
|
if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
|
|
return res
|
|
end
|
|
"""
|
|
|
|
class RedisSchedule(BaseSchedule):
|
|
def __init__(self, name, **connection):
|
|
super(RedisSchedule, self).__init__(name, **connection)
|
|
|
|
self.key = 'huey.schedule.%s' % clean_name(name)
|
|
self.conn = redis.Redis(**connection)
|
|
self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
|
|
|
|
def convert_ts(self, ts):
|
|
return time.mktime(ts.timetuple())
|
|
|
|
def add(self, data, ts):
|
|
self.conn.zadd(self.key, data, self.convert_ts(ts))
|
|
|
|
def read(self, ts):
|
|
unix_ts = self.convert_ts(ts)
|
|
# invoke the redis lua script that will atomically pop off
|
|
# all the tasks older than the given timestamp
|
|
tasks = self._pop(keys=[self.key], args=[unix_ts])
|
|
return [] if tasks is None else tasks
|
|
|
|
def flush(self):
|
|
self.conn.delete(self.key)
|
|
|
|
|
|
class RedisDataStore(BaseDataStore):
|
|
def __init__(self, name, **connection):
|
|
super(RedisDataStore, self).__init__(name, **connection)
|
|
|
|
self.storage_name = 'huey.results.%s' % clean_name(name)
|
|
self.conn = redis.Redis(**connection)
|
|
|
|
def put(self, key, value):
|
|
self.conn.hset(self.storage_name, key, value)
|
|
|
|
def peek(self, key):
|
|
if self.conn.hexists(self.storage_name, key):
|
|
return self.conn.hget(self.storage_name, key)
|
|
return EmptyData
|
|
|
|
def get(self, key):
|
|
val = self.peek(key)
|
|
if val is not EmptyData:
|
|
self.conn.hdel(self.storage_name, key)
|
|
return val
|
|
|
|
def flush(self):
|
|
self.conn.delete(self.storage_name)
|
|
|
|
|
|
class RedisEventEmitter(BaseEventEmitter):
|
|
def __init__(self, channel, **connection):
|
|
super(RedisEventEmitter, self).__init__(channel, **connection)
|
|
self.conn = redis.Redis(**connection)
|
|
|
|
def emit(self, message):
|
|
self.conn.publish(self.channel, message)
|
|
|
|
|
|
Components = (RedisBlockingQueue, RedisDataStore, RedisSchedule,
|
|
RedisEventEmitter)
|