diff --git a/bazarr/signalr_client.py b/bazarr/signalr_client.py index d0faac538..671a86f7b 100644 --- a/bazarr/signalr_client.py +++ b/bazarr/signalr_client.py @@ -21,27 +21,47 @@ from get_args import args class SonarrSignalrClient(threading.Thread): def __init__(self): super(SonarrSignalrClient, self).__init__() - self.stopped = True self.apikey_sonarr = None self.session = Session() self.connection = None - def stop(self): - self.connection.close() - self.stopped = True - logging.info('BAZARR SignalR client for Sonarr is now disconnected.') - - def restart(self): - if not self.stopped: - self.stop() - if settings.general.getboolean('use_sonarr'): - self.run() - - def run(self): + def start(self): if get_sonarr_version().startswith('2.'): logging.warning('BAZARR can only sync from Sonarr v3 SignalR feed to get real-time update. You should ' 'consider upgrading.') return + + logging.debug('BAZARR connecting to Sonarr SignalR feed...') + self.configure() + while not self.connection.is_open: + try: + self.connection.start() + except ConnectionError: + gevent.sleep(5) + logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.') + if not args.dev: + scheduler.execute_job_now('update_series') + scheduler.execute_job_now('sync_episodes') + + def stop(self, log=True): + try: + self.connection.close() + except Exception as e: + pass + if log: + logging.info('BAZARR SignalR client for Sonarr is now disconnected.') + + def restart(self): + if self.connection.is_open: + self.stop(log=False) + if settings.general.getboolean('use_sonarr'): + self.start() + + def exception_handler(self, type, exception, traceback): + logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...') + self.restart() + + def configure(self): self.apikey_sonarr = settings.sonarr.apikey self.connection = Connection(url_sonarr() + "/signalr", self.session) self.connection.qs = {'apikey': self.apikey_sonarr} @@ -51,25 +71,7 @@ class SonarrSignalrClient(threading.Thread): for item in sonarr_method: sonarr_hub.client.on(item, dispatcher) - while True: - if not self.stopped: - return - if self.connection.started: - gevent.sleep(5) - else: - try: - logging.debug('BAZARR connecting to Sonarr SignalR feed...') - self.connection.start() - except ConnectionError: - logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...') - gevent.sleep(15) - else: - self.stopped = False - logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.') - if not args.dev: - scheduler.execute_job_now('update_series') - scheduler.execute_job_now('sync_episodes') - gevent.sleep() + self.connection.exception += self.exception_handler class RadarrSignalrClient(threading.Thread): @@ -82,7 +84,8 @@ class RadarrSignalrClient(threading.Thread): self.configure() logging.debug('BAZARR connecting to Radarr SignalR feed...') self.connection.start() - gevent.sleep() + if not args.dev: + scheduler.execute_job_now('update_movies') def stop(self): logging.info('BAZARR SignalR client for Radarr is now disconnected.') @@ -92,7 +95,6 @@ class RadarrSignalrClient(threading.Thread): if self.connection.transport.state.value in [0, 1, 2]: self.stop() if settings.general.getboolean('use_radarr'): - self.configure() self.start() def configure(self): diff --git a/libs/signalr/__init__.py b/libs/signalr/__init__.py index 3d155c5c6..7742eeb58 100644 --- a/libs/signalr/__init__.py +++ b/libs/signalr/__init__.py @@ -1,8 +1,3 @@ -from gevent import monkey - -monkey.patch_socket() -monkey.patch_ssl() - from ._connection import Connection -__version__ = '0.0.7' +__version__ = '0.0.12' diff --git a/libs/signalr/_connection.py b/libs/signalr/_connection.py index 0d21adcce..6471ba670 100644 --- a/libs/signalr/_connection.py +++ b/libs/signalr/_connection.py @@ -1,5 +1,6 @@ import json -import gevent +import sys +from threading import Thread from signalr.events import EventHook from signalr.hubs import Hub from signalr.transports import AutoTransport @@ -14,13 +15,16 @@ class Connection: self.qs = {} self.__send_counter = -1 self.token = None + self.id = None self.data = None self.received = EventHook() self.error = EventHook() self.starting = EventHook() self.stopping = EventHook() + self.exception = EventHook() + self.is_open = False self.__transport = AutoTransport(session, self) - self.__greenlet = None + self.__listener_thread = None self.started = False def handle_error(**kwargs): @@ -46,28 +50,32 @@ class Connection: negotiate_data = self.__transport.negotiate() self.token = negotiate_data['ConnectionToken'] + self.id = negotiate_data['ConnectionId'] listener = self.__transport.start() def wrapped_listener(): - try: - listener() - gevent.sleep() - except Exception as e: - gevent.kill(self.__greenlet) - self.started = False - - self.__greenlet = gevent.spawn(wrapped_listener) + while self.is_open: + try: + listener() + except: + self.exception.fire(*sys.exc_info()) + self.is_open = False + + self.is_open = True + self.__listener_thread = Thread(target=wrapped_listener) + self.__listener_thread.start() self.started = True def wait(self, timeout=30): - gevent.joinall([self.__greenlet], timeout) + Thread.join(self.__listener_thread, timeout) def send(self, data): self.__transport.send(data) def close(self): - gevent.kill(self.__greenlet) + self.is_open = False + self.__listener_thread.join() self.__transport.close() def register_hub(self, name): diff --git a/libs/signalr/transports/_sse_transport.py b/libs/signalr/transports/_sse_transport.py index 28f28d1bb..7faaf936a 100644 --- a/libs/signalr/transports/_sse_transport.py +++ b/libs/signalr/transports/_sse_transport.py @@ -1,7 +1,6 @@ import json import sseclient from ._transport import Transport -from requests.exceptions import ConnectionError class ServerSentEventsTransport(Transport): @@ -13,16 +12,18 @@ class ServerSentEventsTransport(Transport): return 'serverSentEvents' def start(self): - self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session) + connect_url = self._get_url('connect') + self.__response = iter(sseclient.SSEClient(connect_url, session=self._session)) self._session.get(self._get_url('start')) def _receive(): try: - for notification in self.__response: - if notification.data != 'initialized': - self._handle_notification(notification.data) - except ConnectionError: - raise ConnectionError + notification = next(self.__response) + except StopIteration: + return + else: + if notification.data != 'initialized': + self._handle_notification(notification.data) return _receive diff --git a/libs/signalr/transports/_transport.py b/libs/signalr/transports/_transport.py index c0d0d4278..af62672fd 100644 --- a/libs/signalr/transports/_transport.py +++ b/libs/signalr/transports/_transport.py @@ -1,13 +1,12 @@ from abc import abstractmethod import json import sys - +import threading if sys.version_info[0] < 3: from urllib import quote_plus else: from urllib.parse import quote_plus -import gevent class Transport: @@ -48,7 +47,7 @@ class Transport: if len(message) > 0: data = json.loads(message) self._connection.received.fire(**data) - gevent.sleep() + #thread.sleep() #TODO: investigate if we should sleep here def _get_url(self, action, **kwargs): args = kwargs.copy() diff --git a/libs/signalr/transports/_ws_transport.py b/libs/signalr/transports/_ws_transport.py index fca7935da..4d9a80ad1 100644 --- a/libs/signalr/transports/_ws_transport.py +++ b/libs/signalr/transports/_ws_transport.py @@ -1,7 +1,6 @@ import json import sys -import gevent if sys.version_info[0] < 3: from urlparse import urlparse, urlunparse @@ -39,17 +38,14 @@ class WebSocketsTransport(Transport): self._session.get(self._get_url('start')) def _receive(): - try: - for notification in self.ws: - self._handle_notification(notification) - except ConnectionError: - raise ConnectionError + notification = self.ws.recv() + self._handle_notification(notification) return _receive def send(self, data): self.ws.send(json.dumps(data)) - gevent.sleep() + #thread.sleep() #TODO: inveistage if we should sleep here or not def close(self): self.ws.close() diff --git a/libs/version.txt b/libs/version.txt index 6749d13c3..18a4d3f02 100644 --- a/libs/version.txt +++ b/libs/version.txt @@ -29,7 +29,7 @@ rarfile=3.0 rebulk=3.0.1 requests=2.18.4 semver=2.13.0 -signalr-client=0.0.7 <-- Modified to work with Sonarr +signalr-client-threads=0.0.12 <-- Modified to work with Sonarr signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60 SimpleConfigParser=0.1.0 <-- modified version: do not update!!! six=1.11.0