You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/socketio/zmq_manager.py

111 lines
3.6 KiB

5 years ago
import pickle
import re
try:
import eventlet.green.zmq as zmq
except ImportError:
zmq = None
from .pubsub_manager import PubSubManager
class ZmqManager(PubSubManager): # pragma: no cover
"""zmq based client manager.
NOTE: this zmq implementation should be considered experimental at this
time. At this time, eventlet is required to use zmq.
This class implements a zmq backend for event sharing across multiple
processes. To use a zmq backend, initialize the :class:`Server` instance as
follows::
url = 'zmq+tcp://hostname:port1+port2'
server = socketio.Server(client_manager=socketio.ZmqManager(url))
:param url: The connection URL for the zmq message broker,
which will need to be provided and running.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
A zmq message broker must be running for the zmq_manager to work.
you can write your own or adapt one from the following simple broker
below::
import zmq
receiver = zmq.Context().socket(zmq.PULL)
receiver.bind("tcp://*:5555")
publisher = zmq.Context().socket(zmq.PUB)
publisher.bind("tcp://*:5556")
while True:
publisher.send(receiver.recv())
"""
name = 'zmq'
def __init__(self, url='zmq+tcp://localhost:5555+5556',
channel='socketio',
write_only=False,
logger=None):
if zmq is None:
raise RuntimeError('zmq package is not installed '
'(Run "pip install pyzmq" in your '
'virtualenv).')
r = re.compile(r':\d+\+\d+$')
if not (url.startswith('zmq+tcp://') and r.search(url)):
raise RuntimeError('unexpected connection string: ' + url)
url = url.replace('zmq+', '')
(sink_url, sub_port) = url.split('+')
sink_port = sink_url.split(':')[-1]
sub_url = sink_url.replace(sink_port, sub_port)
sink = zmq.Context().socket(zmq.PUSH)
sink.connect(sink_url)
sub = zmq.Context().socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, u'')
sub.connect(sub_url)
self.sink = sink
self.sub = sub
self.channel = channel
super(ZmqManager, self).__init__(channel=channel,
write_only=write_only,
logger=logger)
def _publish(self, data):
pickled_data = pickle.dumps(
{
'type': 'message',
'channel': self.channel,
'data': data
}
)
return self.sink.send(pickled_data)
def zmq_listen(self):
while True:
response = self.sub.recv()
if response is not None:
yield response
def _listen(self):
for message in self.zmq_listen():
if isinstance(message, bytes):
5 years ago
try:
message = pickle.loads(message)
except Exception:
pass
if isinstance(message, dict) and \
message['type'] == 'message' and \
message['channel'] == self.channel and \
'data' in message:
yield message['data']
return