Improved stability and reliability of SignalR feed clients.

pull/1427/head
morpheus65535 4 years ago
parent 22a7541543
commit 736b67fd80

@ -197,9 +197,9 @@ def proxy(protocol, url):
if settings.general.getboolean('use_sonarr'):
sonarr_signalr_client.start()
gevent.Greenlet.spawn(sonarr_signalr_client.start)
if settings.general.getboolean('use_radarr'):
radarr_signalr_client.start()
gevent.Greenlet.spawn(radarr_signalr_client.start)
if __name__ == "__main__":

@ -38,16 +38,16 @@ class SonarrSignalrClient:
'consider upgrading.')
return
logging.debug('BAZARR connecting to Sonarr SignalR feed...')
logging.info('BAZARR trying to connect to Sonarr SignalR feed...')
self.configure()
while not self.connection.is_open:
while not self.connection.started:
try:
self.connection.start()
except ConnectionError:
gevent.sleep(5)
except json.decoder.JSONDecodeError:
logging.error('BAZARR cannot parse JSON returned by SignalR feed. Take a look at: '
'https://forums.sonarr.tv/t/signalr-problem/5785/3')
logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is a known issue when Sonarr "
"doesn't have write permission to it's /config/xdg directory.")
self.stop()
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
@ -64,13 +64,16 @@ class SonarrSignalrClient:
def restart(self):
if self.connection:
if self.connection.is_open:
self.stop(log=False)
if self.connection.started:
try:
self.stop(log=False)
except:
self.connection.started = False
if settings.general.getboolean('use_sonarr'):
self.start()
def exception_handler(self, type, exception, traceback):
logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
logging.error('BAZARR connection to Sonarr SignalR feed has been lost.')
self.restart()
def configure(self):
@ -94,8 +97,12 @@ class RadarrSignalrClient:
def start(self):
self.configure()
logging.debug('BAZARR connecting to Radarr SignalR feed...')
self.connection.start()
logging.info('BAZARR trying to connect to Radarr SignalR feed...')
while self.connection.transport.state.value not in [0, 1, 2]:
try:
self.connection.start()
except ConnectionError:
gevent.sleep(5)
def stop(self):
logging.info('BAZARR SignalR client for Radarr is now disconnected.')
@ -133,8 +140,8 @@ class RadarrSignalrClient:
"max_attempts": None
}).build()
self.connection.on_open(self.on_connect_handler)
self.connection.on_reconnect(lambda: logging.info('BAZARR SignalR client for Radarr connection as been lost. '
'Trying to reconnect...'))
self.connection.on_reconnect(lambda: logging.error('BAZARR SignalR client for Radarr connection as been lost. '
'Trying to reconnect...'))
self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.'))
self.connection.on_error(self.exception_handler)
self.connection.on("receiveMessage", dispatcher)

@ -1,3 +1,8 @@
from gevent import monkey
monkey.patch_socket()
monkey.patch_ssl()
from ._connection import Connection
__version__ = '0.0.12'
__version__ = '0.0.7'

@ -1,6 +1,6 @@
import json
import gevent
import sys
from threading import Thread
from signalr.events import EventHook
from signalr.hubs import Hub
from signalr.transports import AutoTransport
@ -15,16 +15,14 @@ class Connection:
self.qs = {}
self.__send_counter = -1
self.token = None
self.id = None
self.data = None
self.received = EventHook()
self.error = EventHook()
self.starting = EventHook()
self.stopping = EventHook()
self.exception = EventHook()
self.is_open = False
self.__transport = AutoTransport(session, self)
self.__listener_thread = None
self.__greenlet = None
self.started = False
def handle_error(**kwargs):
@ -50,32 +48,27 @@ class Connection:
negotiate_data = self.__transport.negotiate()
self.token = negotiate_data['ConnectionToken']
self.id = negotiate_data['ConnectionId']
listener = self.__transport.start()
def wrapped_listener():
while self.is_open:
try:
listener()
except:
self.exception.fire(*sys.exc_info())
self.is_open = False
self.is_open = True
self.__listener_thread = Thread(target=wrapped_listener)
self.__listener_thread.start()
try:
listener()
gevent.sleep()
except:
self.exception.fire(*sys.exc_info())
self.__greenlet = gevent.spawn(wrapped_listener)
self.started = True
def wait(self, timeout=30):
Thread.join(self.__listener_thread, timeout)
gevent.joinall([self.__greenlet], timeout)
def send(self, data):
self.__transport.send(data)
def close(self):
self.is_open = False
self.__listener_thread.join()
gevent.kill(self.__greenlet)
self.__transport.close()
def register_hub(self, name):

@ -12,16 +12,11 @@ class ServerSentEventsTransport(Transport):
return 'serverSentEvents'
def start(self):
connect_url = self._get_url('connect')
self.__response = iter(sseclient.SSEClient(connect_url, session=self._session))
self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session)
self._session.get(self._get_url('start'))
def _receive():
try:
notification = next(self.__response)
except StopIteration:
return
else:
for notification in self.__response:
if notification.data != 'initialized':
self._handle_notification(notification.data)

@ -1,12 +1,13 @@
from abc import abstractmethod
import json
import sys
import threading
if sys.version_info[0] < 3:
from urllib import quote_plus
else:
from urllib.parse import quote_plus
import gevent
class Transport:
@ -47,7 +48,7 @@ class Transport:
if len(message) > 0:
data = json.loads(message)
self._connection.received.fire(**data)
#thread.sleep() #TODO: investigate if we should sleep here
gevent.sleep()
def _get_url(self, action, **kwargs):
args = kwargs.copy()

@ -1,6 +1,7 @@
import json
import sys
import gevent
if sys.version_info[0] < 3:
from urlparse import urlparse, urlunparse
@ -38,14 +39,14 @@ class WebSocketsTransport(Transport):
self._session.get(self._get_url('start'))
def _receive():
notification = self.ws.recv()
self._handle_notification(notification)
for notification in self.ws:
self._handle_notification(notification)
return _receive
def send(self, data):
self.ws.send(json.dumps(data))
#thread.sleep() #TODO: inveistage if we should sleep here or not
gevent.sleep()
def close(self):
self.ws.close()

@ -30,7 +30,7 @@ rarfile=3.0
rebulk=3.0.1
requests=2.18.4
semver=2.13.0
signalr-client-threads=0.0.12 <-- Modified to work with Sonarr
signalr-client=0.0.7 <-- Modified to work with Sonarr and added exception handler
signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60 and 61
SimpleConfigParser=0.1.0 <-- modified version: do not update!!!
six=1.11.0

Loading…
Cancel
Save