TISbackup/lib/huey/backends/redis_backend.py

154 lines
4.4 KiB
Python
Raw Normal View History

import re
import time
import redis
from redis.exceptions import ConnectionError
from huey.backends.base import BaseDataStore
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
from huey.backends.base import BaseSchedule
from huey.utils import EmptyData
def clean_name(name):
return re.sub('[^a-z0-9]', '', name)
class RedisQueue(BaseQueue):
"""
A simple Queue that uses the redis to store messages
"""
def __init__(self, name, **connection):
"""
connection = {
'host': 'localhost',
'port': 6379,
'db': 0,
}
"""
super(RedisQueue, self).__init__(name, **connection)
self.queue_name = 'huey.redis.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
def write(self, data):
self.conn.lpush(self.queue_name, data)
def read(self):
return self.conn.rpop(self.queue_name)
def remove(self, data):
return self.conn.lrem(self.queue_name, data)
def flush(self):
self.conn.delete(self.queue_name)
def __len__(self):
return self.conn.llen(self.queue_name)
class RedisBlockingQueue(RedisQueue):
"""
Use the blocking right pop, should result in messages getting
executed close to immediately by the consumer as opposed to
being polled for
"""
blocking = True
def __init__(self, name, read_timeout=None, **connection):
"""
connection = {
'host': 'localhost',
'port': 6379,
'db': 0,
}
"""
super(RedisBlockingQueue, self).__init__(name, **connection)
self.read_timeout = read_timeout
def read(self):
try:
return self.conn.brpop(
self.queue_name,
timeout=self.read_timeout)[1]
except (ConnectionError, TypeError, IndexError):
# unfortunately, there is no way to differentiate a socket timing
# out and a host being unreachable
return None
# a custom lua script to pass to redis that will read tasks from the schedule
# and atomically pop them from the sorted set and return them.
# it won't return anything if it isn't able to remove the items it reads.
SCHEDULE_POP_LUA = """
local key = KEYS[1]
local unix_ts = ARGV[1]
local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
return res
end
"""
class RedisSchedule(BaseSchedule):
def __init__(self, name, **connection):
super(RedisSchedule, self).__init__(name, **connection)
self.key = 'huey.schedule.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
def convert_ts(self, ts):
return time.mktime(ts.timetuple())
def add(self, data, ts):
self.conn.zadd(self.key, data, self.convert_ts(ts))
def read(self, ts):
unix_ts = self.convert_ts(ts)
# invoke the redis lua script that will atomically pop off
# all the tasks older than the given timestamp
tasks = self._pop(keys=[self.key], args=[unix_ts])
return [] if tasks is None else tasks
def flush(self):
self.conn.delete(self.key)
class RedisDataStore(BaseDataStore):
def __init__(self, name, **connection):
super(RedisDataStore, self).__init__(name, **connection)
self.storage_name = 'huey.results.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
def put(self, key, value):
self.conn.hset(self.storage_name, key, value)
def peek(self, key):
if self.conn.hexists(self.storage_name, key):
return self.conn.hget(self.storage_name, key)
return EmptyData
def get(self, key):
val = self.peek(key)
if val is not EmptyData:
self.conn.hdel(self.storage_name, key)
return val
def flush(self):
self.conn.delete(self.storage_name)
class RedisEventEmitter(BaseEventEmitter):
def __init__(self, channel, **connection):
super(RedisEventEmitter, self).__init__(channel, **connection)
self.conn = redis.Redis(**connection)
def emit(self, message):
self.conn.publish(self.channel, message)
Components = (RedisBlockingQueue, RedisDataStore, RedisSchedule,
RedisEventEmitter)