Improved live synchronization with Sonarr and Radarr trough SignalR feed by implementing dedicated threaded events queues, by preventing execution of duplicate events received, by filtering events to be processed and by logging at debug level events received to improve debugging.

pull/1930/head
morpheus65535 2 years ago
parent c6efda0f75
commit 180a40e027

@ -3,11 +3,14 @@
import logging
import json
import time
import threading
from requests import Session
from signalr import Connection
from requests.exceptions import ConnectionError
from signalrcore.hub_connection_builder import HubConnectionBuilder
from collections import deque
from time import sleep
from constants import headers
from sonarr.sync.episodes import sync_episodes, sync_one_episode
@ -21,6 +24,12 @@ from .scheduler import scheduler
from .get_args import args
sonarr_queue = deque()
radarr_queue = deque()
last_event_data = None
class SonarrSignalrClientLegacy:
def __init__(self):
super(SonarrSignalrClientLegacy, self).__init__()
@ -46,8 +55,9 @@ class SonarrSignalrClientLegacy:
except json.decoder.JSONDecodeError:
logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is caused by a permissions "
"issue when Sonarr try to access its /config/.config directory."
"Typically permissions are too permissive - only the user and group Sonarr runs as should have Read/Write permissions (e.g. files 664 / folders 775)"
"You should fix permissions on that directory and restart Sonarr. Also, if you're a Docker image "
"Typically permissions are too permissive - only the user and group Sonarr runs as "
"should have Read/Write permissions (e.g. files 664 / folders 775). You should fix "
"permissions on that directory and restart Sonarr. Also, if you're a Docker image "
"user, you should make sure you properly defined PUID/PGID environment variables. "
"Otherwise, please contact Sonarr support.")
else:
@ -61,21 +71,19 @@ class SonarrSignalrClientLegacy:
try:
self.connection.close()
except Exception:
pass
self.connection.started = False
if log:
logging.info('BAZARR SignalR client for Sonarr is now disconnected.')
def restart(self):
if self.connection:
if self.connection.started:
try:
self.stop(log=False)
except Exception:
self.connection.started = False
self.stop(log=False)
if settings.general.getboolean('use_sonarr'):
self.start()
def exception_handler(self, type, exception, traceback):
def exception_handler(self):
sonarr_queue.clear()
logging.error('BAZARR connection to Sonarr SignalR feed has been lost.')
self.restart()
@ -87,7 +95,7 @@ class SonarrSignalrClientLegacy:
sonarr_method = ['series', 'episode']
for item in sonarr_method:
sonarr_hub.client.on(item, dispatcher)
sonarr_hub.client.on(item, feed_queue)
self.connection.exception += self.exception_handler
@ -119,6 +127,7 @@ class SonarrSignalrClient:
self.start()
def exception_handler(self):
sonarr_queue.clear()
logging.error("BAZARR connection to Sonarr SignalR feed has failed. We'll try to reconnect.")
self.restart()
@ -148,7 +157,7 @@ class SonarrSignalrClient:
'Trying to reconnect...'))
self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Sonarr is disconnected.'))
self.connection.on_error(self.exception_handler)
self.connection.on("receiveMessage", dispatcher)
self.connection.on("receiveMessage", feed_queue)
class RadarrSignalrClient:
@ -178,6 +187,7 @@ class RadarrSignalrClient:
self.start()
def exception_handler(self):
radarr_queue.clear()
logging.error("BAZARR connection to Radarr SignalR feed has failed. We'll try to reconnect.")
self.restart()
@ -206,38 +216,49 @@ class RadarrSignalrClient:
'Trying to reconnect...'))
self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.'))
self.connection.on_error(self.exception_handler)
self.connection.on("receiveMessage", dispatcher)
self.connection.on("receiveMessage", feed_queue)
def dispatcher(data):
try:
topic = media_id = action = None
episodesChanged = None
if isinstance(data, dict):
series_title = series_year = episode_title = season_number = episode_number = movie_title = movie_year = None
#
try:
episodesChanged = False
topic = data['name']
try:
media_id = data['body']['resource']['id']
action = data['body']['action']
media_id = data['body']['resource']['id']
action = data['body']['action']
if topic == 'series':
if 'episodesChanged' in data['body']['resource']:
episodesChanged = data['body']['resource']['episodesChanged']
except KeyError:
return
elif isinstance(data, list):
topic = data[0]['name']
try:
media_id = data[0]['body']['resource']['id']
action = data[0]['body']['action']
except KeyError:
return
series_title = data['body']['resource']['title']
series_year = data['body']['resource']['year']
elif topic == 'episode':
series_title = data['body']['resource']['series']['title']
series_year = data['body']['resource']['series']['year']
episode_title = data['body']['resource']['title']
season_number = data['body']['resource']['seasonNumber']
episode_number = data['body']['resource']['episodeNumber']
elif topic == 'movie':
movie_title = data['body']['resource']['title']
movie_year = data['body']['resource']['year']
except KeyError:
return
if topic == 'series':
logging.debug(f'Event received from Sonarr for series: {series_title} ({series_year})')
update_one_series(series_id=media_id, action=action)
if episodesChanged:
# this will happen if a season monitored status is changed.
sync_episodes(series_id=media_id, send_event=True)
elif topic == 'episode':
logging.debug(f'Event received from Sonarr for episode: {series_title} ({series_year}) - '
f'S{season_number:0>2}E{episode_number:0>2} - {episode_title}')
sync_one_episode(episode_id=media_id, defer_search=settings.sonarr.getboolean('defer_search_signalr'))
elif topic == 'movie':
logging.debug(f'Event received from Radarr for movie: {movie_title} ({movie_year})')
update_one_movie(movie_id=media_id, action=action,
defer_search=settings.radarr.getboolean('defer_search_signalr'))
except Exception as e:
@ -246,6 +267,43 @@ def dispatcher(data):
return
def feed_queue(data):
# check if event is duplicate from the previous one
global last_event_data
if data == last_event_data:
return
else:
last_event_data = data
# some sonarr version send event as a list of a single dict, we make it a dict
if isinstance(data, list) and len(data):
data = data[0]
# if data is a dict and contain an event for series, episode or movie, we add it to the event queue
if isinstance(data, dict) and 'name' in data:
if data['name'] in ['series', 'episode']:
sonarr_queue.append(data)
elif data['name'] == 'movie':
radarr_queue.append(data)
def consume_queue(queue):
# get events data from queue one at a time and dispatch it
while True:
try:
data = queue.popleft()
except IndexError:
pass
else:
dispatcher(data)
sleep(0.1)
# start both queue consuming threads
threading.Thread(target=consume_queue, args=(sonarr_queue,)).start()
threading.Thread(target=consume_queue, args=(radarr_queue,)).start()
# instantiate proper SignalR client
sonarr_signalr_client = SonarrSignalrClientLegacy() if get_sonarr_info.version().startswith(('0.', '2.', '3.')) else \
SonarrSignalrClient()
radarr_signalr_client = RadarrSignalrClient()

@ -91,7 +91,11 @@ def get_series_from_sonarr_api(url, apikey_sonarr, sonarr_series_id=None):
logging.exception("BAZARR Error trying to get series from Sonarr.")
return
else:
return r.json()
result = r.json()
if isinstance(result, dict):
return list(result)
else:
return r.json()
def get_episodes_from_sonarr_api(url, apikey_sonarr, series_id=None, episode_id=None):

Loading…
Cancel
Save