Improved reconnection process for Sonarr SignalR feeds.

pull/1409/head
morpheus65535 4 years ago
parent 85c2cbc7da
commit 44dd478c48

@ -21,27 +21,47 @@ from get_args import args
class SonarrSignalrClient(threading.Thread): class SonarrSignalrClient(threading.Thread):
def __init__(self): def __init__(self):
super(SonarrSignalrClient, self).__init__() super(SonarrSignalrClient, self).__init__()
self.stopped = True
self.apikey_sonarr = None self.apikey_sonarr = None
self.session = Session() self.session = Session()
self.connection = None self.connection = None
def stop(self): def start(self):
self.connection.close()
self.stopped = True
logging.info('BAZARR SignalR client for Sonarr is now disconnected.')
def restart(self):
if not self.stopped:
self.stop()
if settings.general.getboolean('use_sonarr'):
self.run()
def run(self):
if get_sonarr_version().startswith('2.'): if get_sonarr_version().startswith('2.'):
logging.warning('BAZARR can only sync from Sonarr v3 SignalR feed to get real-time update. You should ' logging.warning('BAZARR can only sync from Sonarr v3 SignalR feed to get real-time update. You should '
'consider upgrading.') 'consider upgrading.')
return return
logging.debug('BAZARR connecting to Sonarr SignalR feed...')
self.configure()
while not self.connection.is_open:
try:
self.connection.start()
except ConnectionError:
gevent.sleep(5)
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
scheduler.execute_job_now('update_series')
scheduler.execute_job_now('sync_episodes')
def stop(self, log=True):
try:
self.connection.close()
except Exception as e:
pass
if log:
logging.info('BAZARR SignalR client for Sonarr is now disconnected.')
def restart(self):
if self.connection.is_open:
self.stop(log=False)
if settings.general.getboolean('use_sonarr'):
self.start()
def exception_handler(self, type, exception, traceback):
logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
self.restart()
def configure(self):
self.apikey_sonarr = settings.sonarr.apikey self.apikey_sonarr = settings.sonarr.apikey
self.connection = Connection(url_sonarr() + "/signalr", self.session) self.connection = Connection(url_sonarr() + "/signalr", self.session)
self.connection.qs = {'apikey': self.apikey_sonarr} self.connection.qs = {'apikey': self.apikey_sonarr}
@ -51,25 +71,7 @@ class SonarrSignalrClient(threading.Thread):
for item in sonarr_method: for item in sonarr_method:
sonarr_hub.client.on(item, dispatcher) sonarr_hub.client.on(item, dispatcher)
while True: self.connection.exception += self.exception_handler
if not self.stopped:
return
if self.connection.started:
gevent.sleep(5)
else:
try:
logging.debug('BAZARR connecting to Sonarr SignalR feed...')
self.connection.start()
except ConnectionError:
logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
gevent.sleep(15)
else:
self.stopped = False
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
scheduler.execute_job_now('update_series')
scheduler.execute_job_now('sync_episodes')
gevent.sleep()
class RadarrSignalrClient(threading.Thread): class RadarrSignalrClient(threading.Thread):
@ -82,7 +84,8 @@ class RadarrSignalrClient(threading.Thread):
self.configure() self.configure()
logging.debug('BAZARR connecting to Radarr SignalR feed...') logging.debug('BAZARR connecting to Radarr SignalR feed...')
self.connection.start() self.connection.start()
gevent.sleep() if not args.dev:
scheduler.execute_job_now('update_movies')
def stop(self): def stop(self):
logging.info('BAZARR SignalR client for Radarr is now disconnected.') logging.info('BAZARR SignalR client for Radarr is now disconnected.')
@ -92,7 +95,6 @@ class RadarrSignalrClient(threading.Thread):
if self.connection.transport.state.value in [0, 1, 2]: if self.connection.transport.state.value in [0, 1, 2]:
self.stop() self.stop()
if settings.general.getboolean('use_radarr'): if settings.general.getboolean('use_radarr'):
self.configure()
self.start() self.start()
def configure(self): def configure(self):

@ -1,8 +1,3 @@
from gevent import monkey
monkey.patch_socket()
monkey.patch_ssl()
from ._connection import Connection from ._connection import Connection
__version__ = '0.0.7' __version__ = '0.0.12'

@ -1,5 +1,6 @@
import json import json
import gevent import sys
from threading import Thread
from signalr.events import EventHook from signalr.events import EventHook
from signalr.hubs import Hub from signalr.hubs import Hub
from signalr.transports import AutoTransport from signalr.transports import AutoTransport
@ -14,13 +15,16 @@ class Connection:
self.qs = {} self.qs = {}
self.__send_counter = -1 self.__send_counter = -1
self.token = None self.token = None
self.id = None
self.data = None self.data = None
self.received = EventHook() self.received = EventHook()
self.error = EventHook() self.error = EventHook()
self.starting = EventHook() self.starting = EventHook()
self.stopping = EventHook() self.stopping = EventHook()
self.exception = EventHook()
self.is_open = False
self.__transport = AutoTransport(session, self) self.__transport = AutoTransport(session, self)
self.__greenlet = None self.__listener_thread = None
self.started = False self.started = False
def handle_error(**kwargs): def handle_error(**kwargs):
@ -46,28 +50,32 @@ class Connection:
negotiate_data = self.__transport.negotiate() negotiate_data = self.__transport.negotiate()
self.token = negotiate_data['ConnectionToken'] self.token = negotiate_data['ConnectionToken']
self.id = negotiate_data['ConnectionId']
listener = self.__transport.start() listener = self.__transport.start()
def wrapped_listener(): def wrapped_listener():
try: while self.is_open:
listener() try:
gevent.sleep() listener()
except Exception as e: except:
gevent.kill(self.__greenlet) self.exception.fire(*sys.exc_info())
self.started = False self.is_open = False
self.__greenlet = gevent.spawn(wrapped_listener) self.is_open = True
self.__listener_thread = Thread(target=wrapped_listener)
self.__listener_thread.start()
self.started = True self.started = True
def wait(self, timeout=30): def wait(self, timeout=30):
gevent.joinall([self.__greenlet], timeout) Thread.join(self.__listener_thread, timeout)
def send(self, data): def send(self, data):
self.__transport.send(data) self.__transport.send(data)
def close(self): def close(self):
gevent.kill(self.__greenlet) self.is_open = False
self.__listener_thread.join()
self.__transport.close() self.__transport.close()
def register_hub(self, name): def register_hub(self, name):

@ -1,7 +1,6 @@
import json import json
import sseclient import sseclient
from ._transport import Transport from ._transport import Transport
from requests.exceptions import ConnectionError
class ServerSentEventsTransport(Transport): class ServerSentEventsTransport(Transport):
@ -13,16 +12,18 @@ class ServerSentEventsTransport(Transport):
return 'serverSentEvents' return 'serverSentEvents'
def start(self): def start(self):
self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session) connect_url = self._get_url('connect')
self.__response = iter(sseclient.SSEClient(connect_url, session=self._session))
self._session.get(self._get_url('start')) self._session.get(self._get_url('start'))
def _receive(): def _receive():
try: try:
for notification in self.__response: notification = next(self.__response)
if notification.data != 'initialized': except StopIteration:
self._handle_notification(notification.data) return
except ConnectionError: else:
raise ConnectionError if notification.data != 'initialized':
self._handle_notification(notification.data)
return _receive return _receive

@ -1,13 +1,12 @@
from abc import abstractmethod from abc import abstractmethod
import json import json
import sys import sys
import threading
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
from urllib import quote_plus from urllib import quote_plus
else: else:
from urllib.parse import quote_plus from urllib.parse import quote_plus
import gevent
class Transport: class Transport:
@ -48,7 +47,7 @@ class Transport:
if len(message) > 0: if len(message) > 0:
data = json.loads(message) data = json.loads(message)
self._connection.received.fire(**data) self._connection.received.fire(**data)
gevent.sleep() #thread.sleep() #TODO: investigate if we should sleep here
def _get_url(self, action, **kwargs): def _get_url(self, action, **kwargs):
args = kwargs.copy() args = kwargs.copy()

@ -1,7 +1,6 @@
import json import json
import sys import sys
import gevent
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
from urlparse import urlparse, urlunparse from urlparse import urlparse, urlunparse
@ -39,17 +38,14 @@ class WebSocketsTransport(Transport):
self._session.get(self._get_url('start')) self._session.get(self._get_url('start'))
def _receive(): def _receive():
try: notification = self.ws.recv()
for notification in self.ws: self._handle_notification(notification)
self._handle_notification(notification)
except ConnectionError:
raise ConnectionError
return _receive return _receive
def send(self, data): def send(self, data):
self.ws.send(json.dumps(data)) self.ws.send(json.dumps(data))
gevent.sleep() #thread.sleep() #TODO: inveistage if we should sleep here or not
def close(self): def close(self):
self.ws.close() self.ws.close()

@ -29,7 +29,7 @@ rarfile=3.0
rebulk=3.0.1 rebulk=3.0.1
requests=2.18.4 requests=2.18.4
semver=2.13.0 semver=2.13.0
signalr-client=0.0.7 <-- Modified to work with Sonarr signalr-client-threads=0.0.12 <-- Modified to work with Sonarr
signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60 signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60
SimpleConfigParser=0.1.0 <-- modified version: do not update!!! SimpleConfigParser=0.1.0 <-- modified version: do not update!!!
six=1.11.0 six=1.11.0

Loading…
Cancel
Save