added support for rpm packaging and basic support for deb

This commit is contained in:
root
2017-03-29 18:01:35 +02:00
parent ce758e8129
commit 783e7e6d0d
33 changed files with 3455 additions and 0 deletions
View File
+113
View File
@@ -0,0 +1,113 @@
class BaseQueue(object):
"""
Base implementation for a Queue, all backends should subclass
"""
# whether this backend blocks while waiting for new results or should be
# polled by the consumer
blocking = False
def __init__(self, name, **connection):
"""
Initialize the Queue - this happens once when the module is loaded
:param name: A string representation of the name for this queue
:param connection: Connection parameters for the queue
"""
self.name = name
self.connection = connection
def write(self, data):
"""
Push 'data' onto the queue
"""
raise NotImplementedError
def read(self):
"""
Pop 'data' from the queue, returning None if no data is available --
an empty queue should not raise an Exception!
"""
raise NotImplementedError
def remove(self, data):
"""
Remove the given data from the queue
"""
raise NotImplementedError
def flush(self):
"""
Delete everything from the queue
"""
raise NotImplementedError
def __len__(self):
"""
Used primarily in tests, but return the number of items in the queue
"""
raise NotImplementedError
class BaseSchedule(object):
def __init__(self, name, **connection):
"""
Initialize the Queue - this happens once when the module is loaded
:param name: A string representation of the name for this queue
:param connection: Connection parameters for the queue
"""
self.name = name
self.connection = connection
def add(self, data, ts):
"""
Add the timestamped data to the task schedule.
"""
raise NotImplementedError
def read(self, ts):
"""
Read scheduled items for the given timestamp
"""
raise NotImplementedError
def flush(self):
"""Delete all items in schedule."""
raise NotImplementedError
class BaseDataStore(object):
"""
Base implementation for a data store
"""
def __init__(self, name, **connection):
"""
Initialize the data store
"""
self.name = name
self.connection = connection
def put(self, key, value):
raise NotImplementedError
def peek(self, key):
raise NotImplementedError
def get(self, key):
raise NotImplementedError
def flush(self):
raise NotImplementedError
class BaseEventEmitter(object):
def __init__(self, channel, **connection):
self.channel = channel
self.connection = connection
def emit(self, message):
raise NotImplementedError
Components = (BaseQueue, BaseDataStore, BaseSchedule, BaseEventEmitter)
+103
View File
@@ -0,0 +1,103 @@
"""
Test-only implementations of Queue and DataStore. These will not work for
real applications because they only store tasks/results in memory.
"""
from collections import deque
import heapq
from huey.backends.base import BaseDataStore
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
from huey.backends.base import BaseSchedule
from huey.utils import EmptyData
class DummyQueue(BaseQueue):
def __init__(self, *args, **kwargs):
super(DummyQueue, self).__init__(*args, **kwargs)
self._queue = []
def write(self, data):
self._queue.insert(0, data)
def read(self):
try:
return self._queue.pop()
except IndexError:
return None
def flush(self):
self._queue = []
def remove(self, data):
clone = []
ct = 0
for elem in self._queue:
if elem == data:
ct += 1
else:
clone.append(elem)
self._queue = clone
return ct
def __len__(self):
return len(self._queue)
class DummySchedule(BaseSchedule):
def __init__(self, *args, **kwargs):
super(DummySchedule, self).__init__(*args, **kwargs)
self._schedule = []
def add(self, data, ts):
heapq.heappush(self._schedule, (ts, data))
def read(self, ts):
res = []
while len(self._schedule):
sts, data = heapq.heappop(self._schedule)
if sts <= ts:
res.append(data)
else:
self.add(data, sts)
break
return res
def flush(self):
self._schedule = []
class DummyDataStore(BaseDataStore):
def __init__(self, *args, **kwargs):
super(DummyDataStore, self).__init__(*args, **kwargs)
self._results = {}
def put(self, key, value):
self._results[key] = value
def peek(self, key):
return self._results.get(key, EmptyData)
def get(self, key):
return self._results.pop(key, EmptyData)
def flush(self):
self._results = {}
class DummyEventEmitter(BaseEventEmitter):
def __init__(self, *args, **kwargs):
super(DummyEventEmitter, self).__init__(*args, **kwargs)
self._events = deque()
self.__size = 100
def emit(self, message):
self._events.appendleft(message)
num_events = len(self._events)
if num_events > self.__size * 1.5:
while num_events > self.__size:
self._events.popright()
num_events -= 1
Components = (DummyQueue, DummyDataStore, DummySchedule, DummyEventEmitter)
+153
View File
@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
__author__ = 'deathowl'
import datetime
import re
import time
import pika
from pika.exceptions import AMQPConnectionError
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
def clean_name(name):
return re.sub('[^a-z0-9]', '', name)
class RabbitQueue(BaseQueue):
"""
A simple Queue that uses the rabbit to store messages
"""
def __init__(self, name, **connection):
"""
connection = {
'host': 'localhost',
'port': 5672,
'username': 'guest',
'password': 'guest',
'vhost': '/',
'ssl': False
}
"""
super(RabbitQueue, self).__init__(name, **connection)
self.queue_name = 'huey.rabbit.%s' % clean_name(name)
credentials = pika.PlainCredentials(
connection.get('username', 'guest'),
connection.get('password', 'guest'))
connection_params = pika.ConnectionParameters(
host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False))
self.conn = pika.BlockingConnection(connection_params)
self.channel = self.conn.channel()
self.channel.queue_declare(self.queue_name, durable=True)
def write(self, data):
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=data)
def read(self):
return self.get_data_from_queue(self.queue_name)
def remove(self, data):
# This is not something you usually do in rabbit, this is the only
# operation, which is not atomic, but this "hack" should do the trick.
amount = 0
idx = 0
qlen = len(self)
for method_frame, _, body in self.channel.consume(self.queue_name):
idx += 1
if body == data:
self.channel.basic_ack(method_frame.delivery_tag)
amount += 1
else:
self.channel.basic_nack(
method_frame.delivery_tag,
requeue=True)
if idx >= qlen:
break
self.channel.cancel()
return amount
def flush(self):
self.channel.queue_purge(queue=self.queue_name)
return True
def __len__(self):
queue = self.channel.queue_declare(self.queue_name, durable=True)
return queue.method.message_count
def get_data_from_queue(self, queue):
data = None
if len(self) == 0:
return None
for method_frame, _, body in self.channel.consume(queue):
data = body
self.channel.basic_ack(method_frame.delivery_tag)
break
self.channel.cancel()
return data
class RabbitBlockingQueue(RabbitQueue):
"""
Use the blocking right pop, should result in messages getting
executed close to immediately by the consumer as opposed to
being polled for
"""
blocking = True
def read(self):
try:
return self.get_data_from_queue(self.queue_name)
except AMQPConnectionError:
return None
class RabbitEventEmitter(BaseEventEmitter):
def __init__(self, channel, **connection):
super(RabbitEventEmitter, self).__init__(channel, **connection)
credentials = pika.PlainCredentials(
connection.get('username', 'guest'),
connection.get('password', 'guest'))
connection_params = pika.ConnectionParameters(
host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False))
self.conn = pika.BlockingConnection(connection_params)
self.channel = self.conn.channel()
self.exchange_name = 'huey.events'
self.channel.exchange_declare(
exchange=self.exchange_name,
type='fanout',
auto_delete=False,
durable=True)
def emit(self, message):
properties = pika.BasicProperties(
content_type="text/plain",
delivery_mode=2)
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key='',
body=message,
properties=properties)
Components = (RabbitBlockingQueue, None, None, RabbitEventEmitter)
+153
View File
@@ -0,0 +1,153 @@
import re
import time
import redis
from redis.exceptions import ConnectionError
from huey.backends.base import BaseDataStore
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
from huey.backends.base import BaseSchedule
from huey.utils import EmptyData
def clean_name(name):
return re.sub('[^a-z0-9]', '', name)
class RedisQueue(BaseQueue):
"""
A simple Queue that uses the redis to store messages
"""
def __init__(self, name, **connection):
"""
connection = {
'host': 'localhost',
'port': 6379,
'db': 0,
}
"""
super(RedisQueue, self).__init__(name, **connection)
self.queue_name = 'huey.redis.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
def write(self, data):
self.conn.lpush(self.queue_name, data)
def read(self):
return self.conn.rpop(self.queue_name)
def remove(self, data):
return self.conn.lrem(self.queue_name, data)
def flush(self):
self.conn.delete(self.queue_name)
def __len__(self):
return self.conn.llen(self.queue_name)
class RedisBlockingQueue(RedisQueue):
"""
Use the blocking right pop, should result in messages getting
executed close to immediately by the consumer as opposed to
being polled for
"""
blocking = True
def __init__(self, name, read_timeout=None, **connection):
"""
connection = {
'host': 'localhost',
'port': 6379,
'db': 0,
}
"""
super(RedisBlockingQueue, self).__init__(name, **connection)
self.read_timeout = read_timeout
def read(self):
try:
return self.conn.brpop(
self.queue_name,
timeout=self.read_timeout)[1]
except (ConnectionError, TypeError, IndexError):
# unfortunately, there is no way to differentiate a socket timing
# out and a host being unreachable
return None
# a custom lua script to pass to redis that will read tasks from the schedule
# and atomically pop them from the sorted set and return them.
# it won't return anything if it isn't able to remove the items it reads.
SCHEDULE_POP_LUA = """
local key = KEYS[1]
local unix_ts = ARGV[1]
local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
return res
end
"""
class RedisSchedule(BaseSchedule):
def __init__(self, name, **connection):
super(RedisSchedule, self).__init__(name, **connection)
self.key = 'huey.schedule.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
def convert_ts(self, ts):
return time.mktime(ts.timetuple())
def add(self, data, ts):
self.conn.zadd(self.key, data, self.convert_ts(ts))
def read(self, ts):
unix_ts = self.convert_ts(ts)
# invoke the redis lua script that will atomically pop off
# all the tasks older than the given timestamp
tasks = self._pop(keys=[self.key], args=[unix_ts])
return [] if tasks is None else tasks
def flush(self):
self.conn.delete(self.key)
class RedisDataStore(BaseDataStore):
def __init__(self, name, **connection):
super(RedisDataStore, self).__init__(name, **connection)
self.storage_name = 'huey.results.%s' % clean_name(name)
self.conn = redis.Redis(**connection)
def put(self, key, value):
self.conn.hset(self.storage_name, key, value)
def peek(self, key):
if self.conn.hexists(self.storage_name, key):
return self.conn.hget(self.storage_name, key)
return EmptyData
def get(self, key):
val = self.peek(key)
if val is not EmptyData:
self.conn.hdel(self.storage_name, key)
return val
def flush(self):
self.conn.delete(self.storage_name)
class RedisEventEmitter(BaseEventEmitter):
def __init__(self, channel, **connection):
super(RedisEventEmitter, self).__init__(channel, **connection)
self.conn = redis.Redis(**connection)
def emit(self, message):
self.conn.publish(self.channel, message)
Components = (RedisBlockingQueue, RedisDataStore, RedisSchedule,
RedisEventEmitter)
+205
View File
@@ -0,0 +1,205 @@
""" SQLite backend for Huey.
Inspired from a snippet by Thiago Arruda [1]
[1] http://flask.pocoo.org/snippets/88/
"""
import json
import sqlite3
import time
try:
from thread import get_ident
except ImportError: # Python 3
try:
from threading import get_ident
except ImportError:
from _thread import get_ident
buffer = memoryview
from huey.backends.base import BaseDataStore
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
from huey.backends.base import BaseSchedule
from huey.utils import EmptyData
class _SqliteDatabase(object):
def __init__(self, location):
if location == ':memory:':
raise ValueError("Database location has to be a file path, "
"in-memory databases are not supported.")
self.location = location
self._conn_cache = {}
with self.get_connection() as conn:
# Enable write-ahead logging
conn.execute("PRAGMA journal_mode=WAL;")
# Hand over syncing responsibility to OS
conn.execute("PRAGMA synchronous=OFF;")
# Store temporary tables and indices in memory
conn.execute("PRAGMA temp_store=MEMORY;")
def get_connection(self, immediate=False):
""" Obtain a sqlite3.Connection instance for the database.
Connections are cached on a by-thread basis, i.e. every calling thread
will always get the same Connection object back.
"""
if immediate:
return sqlite3.Connection(self.location, timeout=60,
isolation_level="IMMEDIATE")
id = get_ident()
if id not in self._conn_cache:
self._conn_cache[id] = sqlite3.Connection(
self.location, timeout=60)
return self._conn_cache[id]
class SqliteQueue(BaseQueue):
"""
A simple Queue that uses SQLite to store messages
"""
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
item BLOB
)
"""
_count = "SELECT COUNT(*) FROM {0}"
_append = "INSERT INTO {0} (item) VALUES (?)"
_get = "SELECT id, item FROM {0} ORDER BY id LIMIT 1"
_remove_by_value = "DELETE FROM {0} WHERE item = ?"
_remove_by_id = "DELETE FROM {0} WHERE id = ?"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteQueue, self).__init__(name, location=location)
self.queue_name = 'huey_queue_{0}'.format(name)
self._db = _SqliteDatabase(location)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.queue_name))
def write(self, data):
with self._db.get_connection() as conn:
conn.execute(self._append.format(self.queue_name), (data,))
def read(self):
with self._db.get_connection(immediate=True) as conn:
cursor = conn.execute(self._get.format(self.queue_name))
try:
id, data = next(cursor)
except StopIteration:
return None
if id:
conn.execute(self._remove_by_id.format(self.queue_name), (id,))
return data
def remove(self, data):
with self._db.get_connection() as conn:
return conn.execute(self._remove_by_value.format(self.queue_name),
(data,)).rowcount
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.queue_name,))
def __len__(self):
with self._db.get_connection() as conn:
return next(conn.execute(self._count.format(self.queue_name)))[0]
class SqliteSchedule(BaseSchedule):
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
item BLOB,
timestamp INTEGER
)
"""
_read_items = """
SELECT item, timestamp FROM {0} WHERE timestamp <= ?
ORDER BY timestamp
"""
_delete_items = "DELETE FROM {0} WHERE timestamp <= ?"
_add_item = "INSERT INTO {0} (item, timestamp) VALUES (?, ?)"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteSchedule, self).__init__(name, location=location)
self._db = _SqliteDatabase(location)
self.name = 'huey_schedule_{0}'.format(name)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.name))
def convert_ts(self, ts):
return time.mktime(ts.timetuple())
def add(self, data, ts):
with self._db.get_connection() as conn:
conn.execute(self._add_item.format(self.name),
(data, self.convert_ts(ts)))
def read(self, ts):
with self._db.get_connection() as conn:
results = conn.execute(self._read_items.format(self.name),
(self.convert_ts(ts),)).fetchall()
conn.execute(self._delete_items.format(self.name),
(self.convert_ts(ts),))
return [data for data, _ in results]
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.name))
class SqliteDataStore(BaseDataStore):
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT,
result BLOB
)
"""
_put = "INSERT INTO {0} (key, result) VALUES (?, ?)"
_peek = "SELECT result FROM {0} WHERE key = ?"
_remove = "DELETE FROM {0} WHERE key = ?"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteDataStore, self).__init__(name, location=location)
self._db = _SqliteDatabase(location)
self.name = 'huey_results_{0}'.format(name)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.name))
def put(self, key, value):
with self._db.get_connection() as conn:
conn.execute(self._remove.format(self.name), (key,))
conn.execute(self._put.format(self.name), (key, value))
def peek(self, key):
with self._db.get_connection() as conn:
try:
return next(conn.execute(self._peek.format(self.name),
(key,)))[0]
except StopIteration:
return EmptyData
def get(self, key):
with self._db.get_connection() as conn:
try:
data = next(conn.execute(self._peek.format(self.name),
(key,)))[0]
conn.execute(self._remove.format(self.name), (key,))
return data
except StopIteration:
return EmptyData
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.name))
Components = (SqliteQueue, SqliteDataStore, SqliteSchedule, None)