TISbackup/lib/huey/backends/sqlite_backend.py

206 lines
6.9 KiB
Python
Raw Permalink Normal View History

""" SQLite backend for Huey.
Inspired from a snippet by Thiago Arruda [1]
[1] http://flask.pocoo.org/snippets/88/
"""
import json
import sqlite3
import time
try:
from thread import get_ident
except ImportError: # Python 3
try:
from threading import get_ident
except ImportError:
from _thread import get_ident
buffer = memoryview
from huey.backends.base import BaseDataStore
from huey.backends.base import BaseEventEmitter
from huey.backends.base import BaseQueue
from huey.backends.base import BaseSchedule
from huey.utils import EmptyData
class _SqliteDatabase(object):
def __init__(self, location):
if location == ':memory:':
raise ValueError("Database location has to be a file path, "
"in-memory databases are not supported.")
self.location = location
self._conn_cache = {}
with self.get_connection() as conn:
# Enable write-ahead logging
conn.execute("PRAGMA journal_mode=WAL;")
# Hand over syncing responsibility to OS
conn.execute("PRAGMA synchronous=OFF;")
# Store temporary tables and indices in memory
conn.execute("PRAGMA temp_store=MEMORY;")
def get_connection(self, immediate=False):
""" Obtain a sqlite3.Connection instance for the database.
Connections are cached on a by-thread basis, i.e. every calling thread
will always get the same Connection object back.
"""
if immediate:
return sqlite3.Connection(self.location, timeout=60,
isolation_level="IMMEDIATE")
id = get_ident()
if id not in self._conn_cache:
self._conn_cache[id] = sqlite3.Connection(
self.location, timeout=60)
return self._conn_cache[id]
class SqliteQueue(BaseQueue):
"""
A simple Queue that uses SQLite to store messages
"""
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
item BLOB
)
"""
_count = "SELECT COUNT(*) FROM {0}"
_append = "INSERT INTO {0} (item) VALUES (?)"
_get = "SELECT id, item FROM {0} ORDER BY id LIMIT 1"
_remove_by_value = "DELETE FROM {0} WHERE item = ?"
_remove_by_id = "DELETE FROM {0} WHERE id = ?"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteQueue, self).__init__(name, location=location)
self.queue_name = 'huey_queue_{0}'.format(name)
self._db = _SqliteDatabase(location)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.queue_name))
def write(self, data):
with self._db.get_connection() as conn:
conn.execute(self._append.format(self.queue_name), (data,))
def read(self):
with self._db.get_connection(immediate=True) as conn:
cursor = conn.execute(self._get.format(self.queue_name))
try:
id, data = next(cursor)
except StopIteration:
return None
if id:
conn.execute(self._remove_by_id.format(self.queue_name), (id,))
return data
def remove(self, data):
with self._db.get_connection() as conn:
return conn.execute(self._remove_by_value.format(self.queue_name),
(data,)).rowcount
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.queue_name,))
def __len__(self):
with self._db.get_connection() as conn:
return next(conn.execute(self._count.format(self.queue_name)))[0]
class SqliteSchedule(BaseSchedule):
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
item BLOB,
timestamp INTEGER
)
"""
_read_items = """
SELECT item, timestamp FROM {0} WHERE timestamp <= ?
ORDER BY timestamp
"""
_delete_items = "DELETE FROM {0} WHERE timestamp <= ?"
_add_item = "INSERT INTO {0} (item, timestamp) VALUES (?, ?)"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteSchedule, self).__init__(name, location=location)
self._db = _SqliteDatabase(location)
self.name = 'huey_schedule_{0}'.format(name)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.name))
def convert_ts(self, ts):
return time.mktime(ts.timetuple())
def add(self, data, ts):
with self._db.get_connection() as conn:
conn.execute(self._add_item.format(self.name),
(data, self.convert_ts(ts)))
def read(self, ts):
with self._db.get_connection() as conn:
results = conn.execute(self._read_items.format(self.name),
(self.convert_ts(ts),)).fetchall()
conn.execute(self._delete_items.format(self.name),
(self.convert_ts(ts),))
return [data for data, _ in results]
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.name))
class SqliteDataStore(BaseDataStore):
_create = """
CREATE TABLE IF NOT EXISTS {0}
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT,
result BLOB
)
"""
_put = "INSERT INTO {0} (key, result) VALUES (?, ?)"
_peek = "SELECT result FROM {0} WHERE key = ?"
_remove = "DELETE FROM {0} WHERE key = ?"
_flush = "DELETE FROM {0}"
def __init__(self, name, location):
super(SqliteDataStore, self).__init__(name, location=location)
self._db = _SqliteDatabase(location)
self.name = 'huey_results_{0}'.format(name)
with self._db.get_connection() as conn:
conn.execute(self._create.format(self.name))
def put(self, key, value):
with self._db.get_connection() as conn:
conn.execute(self._remove.format(self.name), (key,))
conn.execute(self._put.format(self.name), (key, value))
def peek(self, key):
with self._db.get_connection() as conn:
try:
return next(conn.execute(self._peek.format(self.name),
(key,)))[0]
except StopIteration:
return EmptyData
def get(self, key):
with self._db.get_connection() as conn:
try:
data = next(conn.execute(self._peek.format(self.name),
(key,)))[0]
conn.execute(self._remove.format(self.name), (key,))
return data
except StopIteration:
return EmptyData
def flush(self):
with self._db.get_connection() as conn:
conn.execute(self._flush.format(self.name))
Components = (SqliteQueue, SqliteDataStore, SqliteSchedule, None)