154 lines
4.5 KiB
Python
154 lines
4.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
__author__ = 'deathowl'
|
|
|
|
import datetime
|
|
import re
|
|
import time
|
|
import pika
|
|
from pika.exceptions import AMQPConnectionError
|
|
|
|
from huey.backends.base import BaseEventEmitter
|
|
from huey.backends.base import BaseQueue
|
|
|
|
|
|
def clean_name(name):
|
|
return re.sub('[^a-z0-9]', '', name)
|
|
|
|
|
|
class RabbitQueue(BaseQueue):
|
|
"""
|
|
A simple Queue that uses the rabbit to store messages
|
|
"""
|
|
def __init__(self, name, **connection):
|
|
"""
|
|
connection = {
|
|
'host': 'localhost',
|
|
'port': 5672,
|
|
'username': 'guest',
|
|
'password': 'guest',
|
|
'vhost': '/',
|
|
'ssl': False
|
|
}
|
|
"""
|
|
super(RabbitQueue, self).__init__(name, **connection)
|
|
|
|
self.queue_name = 'huey.rabbit.%s' % clean_name(name)
|
|
credentials = pika.PlainCredentials(
|
|
connection.get('username', 'guest'),
|
|
connection.get('password', 'guest'))
|
|
connection_params = pika.ConnectionParameters(
|
|
host=connection.get('host', 'localhost'),
|
|
port=connection.get('port', 5672),
|
|
credentials=credentials,
|
|
virtual_host=connection.get('vhost', '/'),
|
|
ssl=connection.get('ssl', False))
|
|
|
|
self.conn = pika.BlockingConnection(connection_params)
|
|
self.channel = self.conn.channel()
|
|
self.channel.queue_declare(self.queue_name, durable=True)
|
|
|
|
def write(self, data):
|
|
self.channel.basic_publish(
|
|
exchange='',
|
|
routing_key=self.queue_name,
|
|
body=data)
|
|
|
|
def read(self):
|
|
return self.get_data_from_queue(self.queue_name)
|
|
|
|
def remove(self, data):
|
|
# This is not something you usually do in rabbit, this is the only
|
|
# operation, which is not atomic, but this "hack" should do the trick.
|
|
amount = 0
|
|
idx = 0
|
|
qlen = len(self)
|
|
|
|
for method_frame, _, body in self.channel.consume(self.queue_name):
|
|
idx += 1
|
|
if body == data:
|
|
self.channel.basic_ack(method_frame.delivery_tag)
|
|
amount += 1
|
|
else:
|
|
self.channel.basic_nack(
|
|
method_frame.delivery_tag,
|
|
requeue=True)
|
|
|
|
if idx >= qlen:
|
|
break
|
|
|
|
self.channel.cancel()
|
|
return amount
|
|
|
|
def flush(self):
|
|
self.channel.queue_purge(queue=self.queue_name)
|
|
return True
|
|
|
|
def __len__(self):
|
|
queue = self.channel.queue_declare(self.queue_name, durable=True)
|
|
return queue.method.message_count
|
|
|
|
def get_data_from_queue(self, queue):
|
|
data = None
|
|
if len(self) == 0:
|
|
return None
|
|
|
|
for method_frame, _, body in self.channel.consume(queue):
|
|
data = body
|
|
self.channel.basic_ack(method_frame.delivery_tag)
|
|
break
|
|
|
|
self.channel.cancel()
|
|
return data
|
|
|
|
|
|
class RabbitBlockingQueue(RabbitQueue):
|
|
"""
|
|
Use the blocking right pop, should result in messages getting
|
|
executed close to immediately by the consumer as opposed to
|
|
being polled for
|
|
"""
|
|
blocking = True
|
|
|
|
def read(self):
|
|
try:
|
|
return self.get_data_from_queue(self.queue_name)
|
|
except AMQPConnectionError:
|
|
return None
|
|
|
|
|
|
class RabbitEventEmitter(BaseEventEmitter):
|
|
def __init__(self, channel, **connection):
|
|
super(RabbitEventEmitter, self).__init__(channel, **connection)
|
|
credentials = pika.PlainCredentials(
|
|
connection.get('username', 'guest'),
|
|
connection.get('password', 'guest'))
|
|
connection_params = pika.ConnectionParameters(
|
|
host=connection.get('host', 'localhost'),
|
|
port=connection.get('port', 5672),
|
|
credentials=credentials,
|
|
virtual_host=connection.get('vhost', '/'),
|
|
ssl=connection.get('ssl', False))
|
|
|
|
self.conn = pika.BlockingConnection(connection_params)
|
|
self.channel = self.conn.channel()
|
|
self.exchange_name = 'huey.events'
|
|
self.channel.exchange_declare(
|
|
exchange=self.exchange_name,
|
|
type='fanout',
|
|
auto_delete=False,
|
|
durable=True)
|
|
|
|
def emit(self, message):
|
|
properties = pika.BasicProperties(
|
|
content_type="text/plain",
|
|
delivery_mode=2)
|
|
|
|
self.channel.basic_publish(
|
|
exchange=self.exchange_name,
|
|
routing_key='',
|
|
body=message,
|
|
properties=properties)
|
|
|
|
|
|
Components = (RabbitBlockingQueue, None, None, RabbitEventEmitter)
|