import pickle import re from .pubsub_manager import PubSubManager class ZmqManager(PubSubManager): # pragma: no cover """zmq based client manager. NOTE: this zmq implementation should be considered experimental at this time. At this time, eventlet is required to use zmq. This class implements a zmq backend for event sharing across multiple processes. To use a zmq backend, initialize the :class:`Server` instance as follows:: url = 'zmq+tcp://hostname:port1+port2' server = socketio.Server(client_manager=socketio.ZmqManager(url)) :param url: The connection URL for the zmq message broker, which will need to be provided and running. :param channel: The channel name on which the server sends and receives notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. A zmq message broker must be running for the zmq_manager to work. you can write your own or adapt one from the following simple broker below:: import zmq receiver = zmq.Context().socket(zmq.PULL) receiver.bind("tcp://*:5555") publisher = zmq.Context().socket(zmq.PUB) publisher.bind("tcp://*:5556") while True: publisher.send(receiver.recv()) """ name = 'zmq' def __init__(self, url='zmq+tcp://localhost:5555+5556', channel='socketio', write_only=False, logger=None): try: from eventlet.green import zmq except ImportError: raise RuntimeError('zmq package is not installed ' '(Run "pip install pyzmq" in your ' 'virtualenv).') r = re.compile(r':\d+\+\d+$') if not (url.startswith('zmq+tcp://') and r.search(url)): raise RuntimeError('unexpected connection string: ' + url) url = url.replace('zmq+', '') (sink_url, sub_port) = url.split('+') sink_port = sink_url.split(':')[-1] sub_url = sink_url.replace(sink_port, sub_port) sink = zmq.Context().socket(zmq.PUSH) sink.connect(sink_url) sub = zmq.Context().socket(zmq.SUB) sub.setsockopt_string(zmq.SUBSCRIBE, u'') sub.connect(sub_url) self.sink = sink self.sub = sub self.channel = channel super().__init__(channel=channel, write_only=write_only, logger=logger) def _publish(self, data): pickled_data = pickle.dumps( { 'type': 'message', 'channel': self.channel, 'data': data } ) return self.sink.send(pickled_data) def zmq_listen(self): while True: response = self.sub.recv() if response is not None: yield response def _listen(self): for message in self.zmq_listen(): if isinstance(message, bytes): try: message = pickle.loads(message) except Exception: pass if isinstance(message, dict) and \ message['type'] == 'message' and \ message['channel'] == self.channel and \ 'data' in message: yield message['data'] return