From e815f47e4fda60270982805ba9acbac6cf4045dd Mon Sep 17 00:00:00 2001 From: Michiel van Baak Date: Mon, 10 Feb 2020 10:42:25 +0100 Subject: [PATCH] Remove unused lib plex_activity --- libs/plex_activity/__init__.py | 15 - libs/plex_activity/activity.py | 96 ------ libs/plex_activity/core/__init__.py | 0 libs/plex_activity/core/helpers.py | 44 --- libs/plex_activity/sources/__init__.py | 4 - libs/plex_activity/sources/base.py | 24 -- .../sources/s_logging/__init__.py | 3 - libs/plex_activity/sources/s_logging/main.py | 249 --------------- .../sources/s_logging/parsers/__init__.py | 4 - .../sources/s_logging/parsers/base.py | 96 ------ .../sources/s_logging/parsers/now_playing.py | 116 ------- .../sources/s_logging/parsers/scrobble.py | 38 --- .../sources/s_websocket/__init__.py | 3 - .../plex_activity/sources/s_websocket/main.py | 298 ------------------ 14 files changed, 990 deletions(-) delete mode 100644 libs/plex_activity/__init__.py delete mode 100644 libs/plex_activity/activity.py delete mode 100644 libs/plex_activity/core/__init__.py delete mode 100644 libs/plex_activity/core/helpers.py delete mode 100644 libs/plex_activity/sources/__init__.py delete mode 100644 libs/plex_activity/sources/base.py delete mode 100644 libs/plex_activity/sources/s_logging/__init__.py delete mode 100644 libs/plex_activity/sources/s_logging/main.py delete mode 100644 libs/plex_activity/sources/s_logging/parsers/__init__.py delete mode 100644 libs/plex_activity/sources/s_logging/parsers/base.py delete mode 100644 libs/plex_activity/sources/s_logging/parsers/now_playing.py delete mode 100644 libs/plex_activity/sources/s_logging/parsers/scrobble.py delete mode 100644 libs/plex_activity/sources/s_websocket/__init__.py delete mode 100644 libs/plex_activity/sources/s_websocket/main.py diff --git a/libs/plex_activity/__init__.py b/libs/plex_activity/__init__.py deleted file mode 100644 index 4da218f59..000000000 --- a/libs/plex_activity/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import logging -import traceback - -log = logging.getLogger(__name__) - -__version__ = '0.7.1' - - -try: - from plex_activity import activity - - # Global objects (using defaults) - Activity = activity.Activity() -except Exception as ex: - log.warn('Unable to import submodules: %s - %s', ex, traceback.format_exc()) diff --git a/libs/plex_activity/activity.py b/libs/plex_activity/activity.py deleted file mode 100644 index f85632a5e..000000000 --- a/libs/plex_activity/activity.py +++ /dev/null @@ -1,96 +0,0 @@ -from plex.lib import six as six -from plex.lib.six.moves import xrange -from plex_activity.sources import Logging, WebSocket - -from pyemitter import Emitter -import logging - -log = logging.getLogger(__name__) - - -class ActivityMeta(type): - def __getitem__(self, key): - for (weight, source) in self.registered: - if source.name == key: - return source - - return None - - -@six.add_metaclass(ActivityMeta) -class Activity(Emitter): - registered = [] - - def __init__(self, sources=None): - self.available = self.get_available(sources) - self.enabled = [] - - def start(self, sources=None): - # TODO async start - - if sources is not None: - self.available = self.get_available(sources) - - # Test methods until an available method is found - for weight, source in self.available: - if weight is None: - # None = always start - self.start_source(source) - elif source.test(): - # Test passed - self.start_source(source) - else: - log.info('activity source "%s" is not available', source.name) - - log.info( - 'Finished starting %s method(s): %s', - len(self.enabled), - ', '.join([('"%s"' % source.name) for source in self.enabled]) - ) - - def start_source(self, source): - instance = source(self) - instance.start() - - self.enabled.append(instance) - - def __getitem__(self, key): - for (weight, source) in self.registered: - if source.name == key: - return source - - return None - - @classmethod - def get_available(cls, sources): - if sources: - return [ - (weight, source) for (weight, source) in cls.registered - if source.name in sources - ] - - return cls.registered - - @classmethod - def register(cls, source, weight=None): - item = (weight, source) - - # weight = None, highest priority - if weight is None: - cls.registered.insert(0, item) - return - - # insert in DESC order - for x in xrange(len(cls.registered)): - w, _ = cls.registered[x] - - if w is not None and w < weight: - cls.registered.insert(x, item) - return - - # otherwise append - cls.registered.append(item) - -# Register activity sources -Activity.register(WebSocket) -Activity.register(Logging, weight=1) diff --git a/libs/plex_activity/core/__init__.py b/libs/plex_activity/core/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/libs/plex_activity/core/helpers.py b/libs/plex_activity/core/helpers.py deleted file mode 100644 index 8ce99c65e..000000000 --- a/libs/plex_activity/core/helpers.py +++ /dev/null @@ -1,44 +0,0 @@ -def str_format(s, *args, **kwargs): - """Return a formatted version of S, using substitutions from args and kwargs. - - (Roughly matches the functionality of str.format but ensures compatibility with Python 2.5) - """ - - args = list(args) - - x = 0 - while x < len(s): - # Skip non-start token characters - if s[x] != '{': - x += 1 - continue - - end_pos = s.find('}', x) - - # If end character can't be found, move to next character - if end_pos == -1: - x += 1 - continue - - name = s[x + 1:end_pos] - - # Ensure token name is alpha numeric - if not name.isalnum(): - x += 1 - continue - - # Try find value for token - value = args.pop(0) if args else kwargs.get(name) - - if value: - value = str(value) - - # Replace token with value - s = s[:x] + value + s[end_pos + 1:] - - # Update current position - x = x + len(value) - 1 - - x += 1 - - return s diff --git a/libs/plex_activity/sources/__init__.py b/libs/plex_activity/sources/__init__.py deleted file mode 100644 index adc1c937e..000000000 --- a/libs/plex_activity/sources/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from plex_activity.sources.s_logging import Logging -from plex_activity.sources.s_websocket import WebSocket - -__all__ = ['Logging', 'WebSocket'] diff --git a/libs/plex_activity/sources/base.py b/libs/plex_activity/sources/base.py deleted file mode 100644 index 773126afd..000000000 --- a/libs/plex_activity/sources/base.py +++ /dev/null @@ -1,24 +0,0 @@ -from pyemitter import Emitter -from threading import Thread -import logging - -log = logging.getLogger(__name__) - - -class Source(Emitter): - name = None - - def __init__(self): - self.thread = Thread(target=self._run_wrapper) - - def start(self): - self.thread.start() - - def run(self): - pass - - def _run_wrapper(self): - try: - self.run() - except Exception as ex: - log.error('Exception raised in "%s" activity source: %s', self.name, ex, exc_info=True) diff --git a/libs/plex_activity/sources/s_logging/__init__.py b/libs/plex_activity/sources/s_logging/__init__.py deleted file mode 100644 index 949d13b34..000000000 --- a/libs/plex_activity/sources/s_logging/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from plex_activity.sources.s_logging.main import Logging - -__all__ = ['Logging'] diff --git a/libs/plex_activity/sources/s_logging/main.py b/libs/plex_activity/sources/s_logging/main.py deleted file mode 100644 index 1cae499c7..000000000 --- a/libs/plex_activity/sources/s_logging/main.py +++ /dev/null @@ -1,249 +0,0 @@ -from plex import Plex -from plex_activity.sources.base import Source -from plex_activity.sources.s_logging.parsers import NowPlayingParser, ScrobbleParser - -from asio import ASIO -from asio.file import SEEK_ORIGIN_CURRENT -from io import BufferedReader -import inspect -import logging -import os -import platform -import time - -log = logging.getLogger(__name__) - -PATH_HINTS = { - 'Darwin': [ - lambda: os.path.join(os.getenv('HOME'), 'Library/Logs/Plex Media Server.log') - ], - 'FreeBSD': [ - # FreeBSD - '/usr/local/plexdata/Plex Media Server/Logs/Plex Media Server.log', - '/usr/local/plexdata-plexpass/Plex Media Server/Logs/Plex Media Server.log', - - # FreeNAS - '/usr/pbi/plexmediaserver-amd64/plexdata/Plex Media Server/Logs/Plex Media Server.log', - '/var/db/plexdata/Plex Media Server/Logs/Plex Media Server.log', - '/var/db/plexdata-plexpass/Plex Media Server/Logs/Plex Media Server.log' - ], - 'Linux': [ - # QNAP - '/share/HDA_DATA/.qpkg/PlexMediaServer/Library/Plex Media Server/Logs/Plex Media Server.log', - - # Debian - '/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs/Plex Media Server.log' - ], - 'Windows': [ - lambda: os.path.join(os.getenv('LOCALAPPDATA'), 'Plex Media Server\\Logs\\Plex Media Server.log') - ] -} - - -class Logging(Source): - name = 'logging' - events = [ - 'logging.playing', - 'logging.action.played', - 'logging.action.unplayed' - ] - - parsers = [] - - path = None - path_hints = PATH_HINTS - - def __init__(self, activity): - super(Logging, self).__init__() - - self.parsers = [p(self) for p in Logging.parsers] - - self.file = None - self.reader = None - - self.path = None - - # Pipe events to the main activity instance - self.pipe(self.events, activity) - - def run(self): - line = self.read_line_retry(ping=True, stale_sleep=0.5) - if not line: - log.info('Unable to read log file') - return - - log.debug('Ready') - - while True: - # Grab the next line of the log - line = self.read_line_retry(ping=True) - - if line: - self.process(line) - else: - log.info('Unable to read log file') - - def process(self, line): - for parser in self.parsers: - if parser.process(line): - return True - - return False - - def read_line(self): - if not self.file: - path = self.get_path() - if not path: - raise Exception('Unable to find the location of "Plex Media Server.log"') - - # Open file - self.file = ASIO.open(path, opener=False) - self.file.seek(self.file.get_size(), SEEK_ORIGIN_CURRENT) - - # Create buffered reader - self.reader = BufferedReader(self.file) - - self.path = self.file.get_path() - log.info('Opened file path: "%s"' % self.path) - - return self.reader.readline() - - def read_line_retry(self, timeout=60, ping=False, stale_sleep=1.0): - line = None - stale_since = None - - while not line: - line = self.read_line() - - if line: - stale_since = None - time.sleep(0.05) - break - - if stale_since is None: - stale_since = time.time() - time.sleep(stale_sleep) - continue - elif (time.time() - stale_since) > timeout: - return None - elif (time.time() - stale_since) > timeout / 2: - # Nothing returned for 5 seconds - if self.file.get_path() != self.path: - log.debug("Log file moved (probably rotated), closing") - self.close() - elif ping: - # Ping server to see if server is still active - Plex.detail() - ping = False - - time.sleep(stale_sleep) - - return line - - def close(self): - if not self.file: - return - - try: - # Close the buffered reader - self.reader.close() - except Exception as ex: - log.error('reader.close() - raised exception: %s', ex, exc_info=True) - finally: - self.reader = None - - try: - # Close the file handle - self.file.close() - except OSError as ex: - if ex.errno == 9: - # Bad file descriptor, already closed? - log.info('file.close() - ignoring raised exception: %s (already closed)', ex) - else: - log.error('file.close() - raised exception: %s', ex, exc_info=True) - except Exception as ex: - log.error('file.close() - raised exception: %s', ex, exc_info=True) - finally: - self.file = None - - @classmethod - def get_path(cls): - if cls.path: - return cls.path - - hints = cls.get_hints() - - log.debug('hints: %r', hints) - - if not hints: - log.error('Unable to find any hints for "%s", operating system not supported', platform.system()) - return None - - for hint in hints: - log.debug('Testing if "%s" exists', hint) - - if os.path.exists(hint): - cls.path = hint - break - - if cls.path: - log.debug('Using the path: %r', cls.path) - else: - log.error('Unable to find a valid path for "Plex Media Server.log"', extra={ - 'data': { - 'hints': hints - } - }) - - return cls.path - - @classmethod - def add_hint(cls, path, system=None): - if system not in cls.path_hints: - cls.path_hints[system] = [] - - cls.path_hints[system].append(path) - - @classmethod - def get_hints(cls): - # Retrieve system hints - hints_system = PATH_HINTS.get(platform.system(), []) - - # Retrieve global hints - hints_global = PATH_HINTS.get(None, []) - - # Retrieve hint from server preferences (if available) - data_path = Plex[':/prefs'].get('LocalAppDataPath') - - if data_path: - hints_global.append(os.path.join(data_path.value, "Plex Media Server", "Logs", "Plex Media Server.log")) - else: - log.info('Unable to retrieve "LocalAppDataPath" from server') - - hints = [] - - for hint in (hints_global + hints_system): - # Resolve hint function - if inspect.isfunction(hint): - hint = hint() - - # Check for duplicate - if hint in hints: - continue - - hints.append(hint) - - return hints - - @classmethod - def test(cls): - # TODO "Logging" source testing - return True - - @classmethod - def register(cls, parser): - cls.parsers.append(parser) - - -Logging.register(NowPlayingParser) -Logging.register(ScrobbleParser) diff --git a/libs/plex_activity/sources/s_logging/parsers/__init__.py b/libs/plex_activity/sources/s_logging/parsers/__init__.py deleted file mode 100644 index 4792a4548..000000000 --- a/libs/plex_activity/sources/s_logging/parsers/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from plex_activity.sources.s_logging.parsers.now_playing import NowPlayingParser -from plex_activity.sources.s_logging.parsers.scrobble import ScrobbleParser - -__all__ = ['NowPlayingParser', 'ScrobbleParser'] diff --git a/libs/plex_activity/sources/s_logging/parsers/base.py b/libs/plex_activity/sources/s_logging/parsers/base.py deleted file mode 100644 index b8fecf04d..000000000 --- a/libs/plex_activity/sources/s_logging/parsers/base.py +++ /dev/null @@ -1,96 +0,0 @@ -from plex.lib.six.moves import urllib_parse as urlparse -from plex_activity.core.helpers import str_format - -from pyemitter import Emitter -import logging -import re - -log = logging.getLogger(__name__) - -LOG_PATTERN = r'^.*?\[\w+\]\s\w+\s-\s{message}$' -REQUEST_HEADER_PATTERN = str_format(LOG_PATTERN, message=r"Request: (\[(?P
.*?):(?P\d+)[^]]*\]\s)?{method} {path}.*?") - -IGNORE_PATTERNS = [ - r'error parsing allowedNetworks.*?', - r'Comparing request from.*?', - r'(Auth: )?We found auth token (.*?), enabling token-based authentication\.', - r'(Auth: )?Came in with a super-token, authorization succeeded\.', - r'(Auth: )?Refreshing tokens inside the token-based authentication filter\.', - r'\[Now\] Updated play state for .*?', - r'Play progress on .*? - got played .*? ms by account .*?!', - r'(Statistics: )?\(.*?\) Reporting active playback in state \d+ of type \d+ \(.*?\) for account \d+', - r'Request: \[.*?\] (GET|PUT) /video/:/transcode/.*?', - r'Received transcode session ping for session .*?' -] - -IGNORE_REGEX = re.compile(str_format(LOG_PATTERN, message='(%s)' % ('|'.join('(%s)' % x for x in IGNORE_PATTERNS))), re.IGNORECASE) - - -PARAM_REGEX = re.compile(str_format(LOG_PATTERN, message=r' \* (?P.*?) =\> (?P.*?)'), re.IGNORECASE) - - -class Parser(Emitter): - def __init__(self, core): - self.core = core - - def read_parameters(self, *match_functions): - match_functions = [self.parameter_match] + list(match_functions) - - info = {} - - while True: - line = self.core.read_line_retry(timeout=5) - if not line: - log.info('Unable to read log file') - return {} - - # Run through each match function to find a result - match = None - for func in match_functions: - match = func(line) - - if match is not None: - break - - # Update info dict with result, otherwise finish reading - if match: - info.update(match) - elif match is None and IGNORE_REGEX.match(line.strip()) is None: - log.debug('break on "%s"', line.strip()) - break - - return info - - def process(self, line): - raise NotImplementedError() - - @staticmethod - def parameter_match(line): - match = PARAM_REGEX.match(line.strip()) - if not match: - return None - - match = match.groupdict() - - return {match['key']: match['value']} - - @staticmethod - def regex_match(regex, line): - match = regex.match(line.strip()) - if not match: - return None - - return match.groupdict() - - @staticmethod - def query(match, value): - if not value: - return - - try: - parameters = urlparse.parse_qsl(value, strict_parsing=True) - except ValueError: - return - - for key, value in parameters: - match.setdefault(key, value) diff --git a/libs/plex_activity/sources/s_logging/parsers/now_playing.py b/libs/plex_activity/sources/s_logging/parsers/now_playing.py deleted file mode 100644 index c7242414c..000000000 --- a/libs/plex_activity/sources/s_logging/parsers/now_playing.py +++ /dev/null @@ -1,116 +0,0 @@ -from plex_activity.core.helpers import str_format -from plex_activity.sources.s_logging.parsers.base import Parser, LOG_PATTERN, REQUEST_HEADER_PATTERN - -import logging -import re - -log = logging.getLogger(__name__) - -PLAYING_HEADER_PATTERN = str_format(REQUEST_HEADER_PATTERN, method="GET", path="/:/(?Ptimeline|progress)/?(?:\?(?P.*?))?\s") -PLAYING_HEADER_REGEX = re.compile(PLAYING_HEADER_PATTERN, re.IGNORECASE) - -RANGE_REGEX = re.compile(str_format(LOG_PATTERN, message=r'Request range: \d+ to \d+'), re.IGNORECASE) -CLIENT_REGEX = re.compile(str_format(LOG_PATTERN, message=r'Client \[(?P.*?)\].*?'), re.IGNORECASE) - -NOW_USER_REGEX = re.compile(str_format(LOG_PATTERN, message=r'\[Now\] User is (?P.+) \(ID: (?P\d+)\)'), re.IGNORECASE) -NOW_CLIENT_REGEX = re.compile(str_format(LOG_PATTERN, message=r'\[Now\] Device is (?P.+?) \((?P.+)\)\.'), re.IGNORECASE) - - -class NowPlayingParser(Parser): - required_info = [ - 'ratingKey', - 'state', 'time' - ] - - extra_info = [ - 'duration', - - 'user_name', 'user_id', - 'machineIdentifier', 'client' - ] - - events = [ - 'logging.playing' - ] - - def __init__(self, main): - super(NowPlayingParser, self).__init__(main) - - # Pipe events to the main logging activity instance - self.pipe(self.events, main) - - def process(self, line): - header_match = PLAYING_HEADER_REGEX.match(line) - if not header_match: - return False - - activity_type = header_match.group('type') - - # Get a match from the activity entries - if activity_type == 'timeline': - match = self.timeline() - elif activity_type == 'progress': - match = self.progress() - else: - log.warn('Unknown activity type "%s"', activity_type) - return True - - print match, activity_type - - if match is None: - match = {} - - # Extend match with query info - self.query(match, header_match.group('query')) - - # Ensure we successfully matched a result - if not match: - return True - - # Sanitize the activity result - info = { - 'address': header_match.group('address'), - 'port': header_match.group('port') - } - - # - Get required info parameters - for key in self.required_info: - if key in match and match[key] is not None: - info[key] = match[key] - else: - log.info('Invalid activity match, missing key %s (matched keys: %s)', key, match.keys()) - return True - - # - Add in any extra info parameters - for key in self.extra_info: - if key in match: - info[key] = match[key] - else: - info[key] = None - - # Update the scrobbler with the current state - self.emit('logging.playing', info) - return True - - def timeline(self): - return self.read_parameters( - lambda line: self.regex_match(CLIENT_REGEX, line), - lambda line: self.regex_match(RANGE_REGEX, line), - - # [Now]* entries - lambda line: self.regex_match(NOW_USER_REGEX, line), - lambda line: self.regex_match(NOW_CLIENT_REGEX, line), - ) - - def progress(self): - data = self.read_parameters() - - if not data: - return {} - - # Translate parameters into timeline-style form - return { - 'state': data.get('state'), - 'ratingKey': data.get('key'), - 'time': data.get('time') - } diff --git a/libs/plex_activity/sources/s_logging/parsers/scrobble.py b/libs/plex_activity/sources/s_logging/parsers/scrobble.py deleted file mode 100644 index a1b2c93f4..000000000 --- a/libs/plex_activity/sources/s_logging/parsers/scrobble.py +++ /dev/null @@ -1,38 +0,0 @@ -from plex_activity.core.helpers import str_format -from plex_activity.sources.s_logging.parsers.base import Parser, LOG_PATTERN - -import re - - -class ScrobbleParser(Parser): - pattern = str_format(LOG_PATTERN, message=r'Library item (?P\d+) \'(?P.*?)\' got (?P<action>(?:un)?played) by account (?P<account_key>\d+)!.*?') - regex = re.compile(pattern, re.IGNORECASE) - - events = [ - 'logging.action.played', - 'logging.action.unplayed' - ] - - def __init__(self, main): - super(ScrobbleParser, self).__init__(main) - - # Pipe events to the main logging activity instance - self.pipe(self.events, main) - - def process(self, line): - match = self.regex.match(line) - if not match: - return False - - action = match.group('action') - if not action: - return False - - self.emit('logging.action.%s' % action, { - 'account_key': match.group('account_key'), - 'rating_key': match.group('rating_key'), - - 'title': match.group('title') - }) - - return True diff --git a/libs/plex_activity/sources/s_websocket/__init__.py b/libs/plex_activity/sources/s_websocket/__init__.py deleted file mode 100644 index 2657b37a9..000000000 --- a/libs/plex_activity/sources/s_websocket/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from plex_activity.sources.s_websocket.main import WebSocket - -__all__ = ['WebSocket'] diff --git a/libs/plex_activity/sources/s_websocket/main.py b/libs/plex_activity/sources/s_websocket/main.py deleted file mode 100644 index 429498fc3..000000000 --- a/libs/plex_activity/sources/s_websocket/main.py +++ /dev/null @@ -1,298 +0,0 @@ -from plex import Plex -from plex.lib.six.moves.urllib_parse import urlencode -from plex_activity.sources.base import Source - -import json -import logging -import re -import time -import websocket - -log = logging.getLogger(__name__) - -SCANNING_REGEX = re.compile('Scanning the "(?P<section>.*?)" section', re.IGNORECASE) -SCAN_COMPLETE_REGEX = re.compile('Library scan complete', re.IGNORECASE) - -TIMELINE_STATES = { - 0: 'created', - 2: 'matching', - 3: 'downloading', - 4: 'loading', - 5: 'finished', - 6: 'analyzing', - 9: 'deleted' -} - - -class WebSocket(Source): - name = 'websocket' - events = [ - 'websocket.playing', - - 'websocket.scanner.started', - 'websocket.scanner.progress', - 'websocket.scanner.finished', - - 'websocket.timeline.created', - 'websocket.timeline.matching', - 'websocket.timeline.downloading', - 'websocket.timeline.loading', - 'websocket.timeline.finished', - 'websocket.timeline.analyzing', - 'websocket.timeline.deleted' - ] - - opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY) - - def __init__(self, activity): - super(WebSocket, self).__init__() - - self.ws = None - self.reconnects = 0 - - # Pipe events to the main activity instance - self.pipe(self.events, activity) - - def connect(self): - uri = 'ws://%s:%s/:/websockets/notifications' % ( - Plex.configuration.get('server.host', '127.0.0.1'), - Plex.configuration.get('server.port', 32400) - ) - - params = {} - - # Set authentication token (if one is available) - if Plex.configuration['authentication.token']: - params['X-Plex-Token'] = Plex.configuration['authentication.token'] - - # Append parameters to uri - if params: - uri += '?' + urlencode(params) - - # Create websocket connection - self.ws = websocket.create_connection(uri) - - def run(self): - self.connect() - - log.debug('Ready') - - while True: - try: - self.process(*self.receive()) - - # successfully received data, reset reconnects counter - self.reconnects = 0 - except websocket.WebSocketConnectionClosedException: - if self.reconnects <= 5: - self.reconnects += 1 - - # Increasing sleep interval between reconnections - if self.reconnects > 1: - time.sleep(2 * (self.reconnects - 1)) - - log.info('WebSocket connection has closed, reconnecting...') - self.connect() - else: - log.error('WebSocket connection unavailable, activity monitoring not available') - break - - def receive(self): - frame = self.ws.recv_frame() - - if not frame: - raise websocket.WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in self.opcode_data: - return frame.opcode, frame.data - elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: - self.ws.send_close() - return frame.opcode, None - elif frame.opcode == websocket.ABNF.OPCODE_PING: - self.ws.pong("Hi!") - - return None, None - - def process(self, opcode, data): - if opcode not in self.opcode_data: - return False - - try: - info = json.loads(data) - except UnicodeDecodeError as ex: - log.warn('Error decoding message from websocket: %s' % ex, extra={ - 'event': { - 'module': __name__, - 'name': 'process.loads.unicode_decode_error', - 'key': '%s:%s' % (ex.encoding, ex.reason) - } - }) - log.debug(data) - return False - except Exception as ex: - log.warn('Error decoding message from websocket: %s' % ex, extra={ - 'event': { - 'module': __name__, - 'name': 'process.load_exception', - 'key': ex.message - } - }) - log.debug(data) - return False - - # Handle modern messages (PMS 1.3.0+) - if type(info.get('NotificationContainer')) is dict: - info = info['NotificationContainer'] - - # Process message - m_type = info.get('type') - - if not m_type: - log.debug('Received message with no "type" parameter: %r', info) - return False - - # Pre-process message (if function exists) - process_func = getattr(self, 'process_%s' % m_type, None) - - if process_func and process_func(info): - return True - - # Emit raw message - return self.emit_notification('%s.notification.%s' % (self.name, m_type), info) - - def process_playing(self, info): - children = info.get('_children') or info.get('PlaySessionStateNotification') - - if not children: - log.debug('Received "playing" message with no children: %r', info) - return False - - return self.emit_notification('%s.playing' % self.name, children) - - def process_progress(self, info): - children = info.get('_children') or info.get('ProgressNotification') - - if not children: - log.debug('Received "progress" message with no children: %r', info) - return False - - for notification in children: - self.emit('%s.scanner.progress' % self.name, { - 'message': notification.get('message') - }) - - return True - - def process_status(self, info): - children = info.get('_children') or info.get('StatusNotification') - - if not children: - log.debug('Received "status" message with no children: %r', info) - return False - - # Process children - count = 0 - - for notification in children: - title = notification.get('title') - - if not title: - continue - - # Scan complete message - if SCAN_COMPLETE_REGEX.match(title): - self.emit('%s.scanner.finished' % self.name) - count += 1 - continue - - # Scanning message - match = SCANNING_REGEX.match(title) - - if not match: - continue - - section = match.group('section') - - if not section: - continue - - self.emit('%s.scanner.started' % self.name, {'section': section}) - count += 1 - - # Validate result - if count < 1: - log.debug('Received "status" message with no valid children: %r', info) - return False - - return True - - def process_timeline(self, info): - children = info.get('_children') or info.get('TimelineEntry') - - if not children: - log.debug('Received "timeline" message with no children: %r', info) - return False - - # Process children - count = 0 - - for entry in children: - state = TIMELINE_STATES.get(entry.get('state')) - - if not state: - continue - - self.emit('%s.timeline.%s' % (self.name, state), entry) - count += 1 - - # Validate result - if count < 1: - log.debug('Received "timeline" message with no valid children: %r', info) - return False - - return True - - # - # Helpers - # - - def emit_notification(self, name, info=None): - if info is None: - info = {} - - # Emit children - children = self._get_children(info) - - if children: - for child in children: - self.emit(name, child) - - return True - - # Emit objects - if info: - self.emit(name, info) - else: - self.emit(name) - - return True - - @staticmethod - def _get_children(info): - if type(info) is list: - return info - - if type(info) is not dict: - return None - - # Return legacy children - if info.get('_children'): - return info['_children'] - - # Search for modern children container - for key, value in info.items(): - key = key.lower() - - if (key.endswith('entry') or key.endswith('notification')) and type(value) is list: - return value - - return None