parent
eda8880357
commit
e815f47e4f
@ -1,15 +0,0 @@
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
__version__ = '0.7.1'
|
||||
|
||||
|
||||
try:
|
||||
from plex_activity import activity
|
||||
|
||||
# Global objects (using defaults)
|
||||
Activity = activity.Activity()
|
||||
except Exception as ex:
|
||||
log.warn('Unable to import submodules: %s - %s', ex, traceback.format_exc())
|
@ -1,96 +0,0 @@
|
||||
from plex.lib import six as six
|
||||
from plex.lib.six.moves import xrange
|
||||
from plex_activity.sources import Logging, WebSocket
|
||||
|
||||
from pyemitter import Emitter
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ActivityMeta(type):
|
||||
def __getitem__(self, key):
|
||||
for (weight, source) in self.registered:
|
||||
if source.name == key:
|
||||
return source
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@six.add_metaclass(ActivityMeta)
|
||||
class Activity(Emitter):
|
||||
registered = []
|
||||
|
||||
def __init__(self, sources=None):
|
||||
self.available = self.get_available(sources)
|
||||
self.enabled = []
|
||||
|
||||
def start(self, sources=None):
|
||||
# TODO async start
|
||||
|
||||
if sources is not None:
|
||||
self.available = self.get_available(sources)
|
||||
|
||||
# Test methods until an available method is found
|
||||
for weight, source in self.available:
|
||||
if weight is None:
|
||||
# None = always start
|
||||
self.start_source(source)
|
||||
elif source.test():
|
||||
# Test passed
|
||||
self.start_source(source)
|
||||
else:
|
||||
log.info('activity source "%s" is not available', source.name)
|
||||
|
||||
log.info(
|
||||
'Finished starting %s method(s): %s',
|
||||
len(self.enabled),
|
||||
', '.join([('"%s"' % source.name) for source in self.enabled])
|
||||
)
|
||||
|
||||
def start_source(self, source):
|
||||
instance = source(self)
|
||||
instance.start()
|
||||
|
||||
self.enabled.append(instance)
|
||||
|
||||
def __getitem__(self, key):
|
||||
for (weight, source) in self.registered:
|
||||
if source.name == key:
|
||||
return source
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_available(cls, sources):
|
||||
if sources:
|
||||
return [
|
||||
(weight, source) for (weight, source) in cls.registered
|
||||
if source.name in sources
|
||||
]
|
||||
|
||||
return cls.registered
|
||||
|
||||
@classmethod
|
||||
def register(cls, source, weight=None):
|
||||
item = (weight, source)
|
||||
|
||||
# weight = None, highest priority
|
||||
if weight is None:
|
||||
cls.registered.insert(0, item)
|
||||
return
|
||||
|
||||
# insert in DESC order
|
||||
for x in xrange(len(cls.registered)):
|
||||
w, _ = cls.registered[x]
|
||||
|
||||
if w is not None and w < weight:
|
||||
cls.registered.insert(x, item)
|
||||
return
|
||||
|
||||
# otherwise append
|
||||
cls.registered.append(item)
|
||||
|
||||
# Register activity sources
|
||||
Activity.register(WebSocket)
|
||||
Activity.register(Logging, weight=1)
|
@ -1,44 +0,0 @@
|
||||
def str_format(s, *args, **kwargs):
|
||||
"""Return a formatted version of S, using substitutions from args and kwargs.
|
||||
|
||||
(Roughly matches the functionality of str.format but ensures compatibility with Python 2.5)
|
||||
"""
|
||||
|
||||
args = list(args)
|
||||
|
||||
x = 0
|
||||
while x < len(s):
|
||||
# Skip non-start token characters
|
||||
if s[x] != '{':
|
||||
x += 1
|
||||
continue
|
||||
|
||||
end_pos = s.find('}', x)
|
||||
|
||||
# If end character can't be found, move to next character
|
||||
if end_pos == -1:
|
||||
x += 1
|
||||
continue
|
||||
|
||||
name = s[x + 1:end_pos]
|
||||
|
||||
# Ensure token name is alpha numeric
|
||||
if not name.isalnum():
|
||||
x += 1
|
||||
continue
|
||||
|
||||
# Try find value for token
|
||||
value = args.pop(0) if args else kwargs.get(name)
|
||||
|
||||
if value:
|
||||
value = str(value)
|
||||
|
||||
# Replace token with value
|
||||
s = s[:x] + value + s[end_pos + 1:]
|
||||
|
||||
# Update current position
|
||||
x = x + len(value) - 1
|
||||
|
||||
x += 1
|
||||
|
||||
return s
|
@ -1,4 +0,0 @@
|
||||
from plex_activity.sources.s_logging import Logging
|
||||
from plex_activity.sources.s_websocket import WebSocket
|
||||
|
||||
__all__ = ['Logging', 'WebSocket']
|
@ -1,24 +0,0 @@
|
||||
from pyemitter import Emitter
|
||||
from threading import Thread
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Source(Emitter):
|
||||
name = None
|
||||
|
||||
def __init__(self):
|
||||
self.thread = Thread(target=self._run_wrapper)
|
||||
|
||||
def start(self):
|
||||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
def _run_wrapper(self):
|
||||
try:
|
||||
self.run()
|
||||
except Exception as ex:
|
||||
log.error('Exception raised in "%s" activity source: %s', self.name, ex, exc_info=True)
|
@ -1,3 +0,0 @@
|
||||
from plex_activity.sources.s_logging.main import Logging
|
||||
|
||||
__all__ = ['Logging']
|
@ -1,249 +0,0 @@
|
||||
from plex import Plex
|
||||
from plex_activity.sources.base import Source
|
||||
from plex_activity.sources.s_logging.parsers import NowPlayingParser, ScrobbleParser
|
||||
|
||||
from asio import ASIO
|
||||
from asio.file import SEEK_ORIGIN_CURRENT
|
||||
from io import BufferedReader
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import time
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PATH_HINTS = {
|
||||
'Darwin': [
|
||||
lambda: os.path.join(os.getenv('HOME'), 'Library/Logs/Plex Media Server.log')
|
||||
],
|
||||
'FreeBSD': [
|
||||
# FreeBSD
|
||||
'/usr/local/plexdata/Plex Media Server/Logs/Plex Media Server.log',
|
||||
'/usr/local/plexdata-plexpass/Plex Media Server/Logs/Plex Media Server.log',
|
||||
|
||||
# FreeNAS
|
||||
'/usr/pbi/plexmediaserver-amd64/plexdata/Plex Media Server/Logs/Plex Media Server.log',
|
||||
'/var/db/plexdata/Plex Media Server/Logs/Plex Media Server.log',
|
||||
'/var/db/plexdata-plexpass/Plex Media Server/Logs/Plex Media Server.log'
|
||||
],
|
||||
'Linux': [
|
||||
# QNAP
|
||||
'/share/HDA_DATA/.qpkg/PlexMediaServer/Library/Plex Media Server/Logs/Plex Media Server.log',
|
||||
|
||||
# Debian
|
||||
'/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs/Plex Media Server.log'
|
||||
],
|
||||
'Windows': [
|
||||
lambda: os.path.join(os.getenv('LOCALAPPDATA'), 'Plex Media Server\\Logs\\Plex Media Server.log')
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class Logging(Source):
|
||||
name = 'logging'
|
||||
events = [
|
||||
'logging.playing',
|
||||
'logging.action.played',
|
||||
'logging.action.unplayed'
|
||||
]
|
||||
|
||||
parsers = []
|
||||
|
||||
path = None
|
||||
path_hints = PATH_HINTS
|
||||
|
||||
def __init__(self, activity):
|
||||
super(Logging, self).__init__()
|
||||
|
||||
self.parsers = [p(self) for p in Logging.parsers]
|
||||
|
||||
self.file = None
|
||||
self.reader = None
|
||||
|
||||
self.path = None
|
||||
|
||||
# Pipe events to the main activity instance
|
||||
self.pipe(self.events, activity)
|
||||
|
||||
def run(self):
|
||||
line = self.read_line_retry(ping=True, stale_sleep=0.5)
|
||||
if not line:
|
||||
log.info('Unable to read log file')
|
||||
return
|
||||
|
||||
log.debug('Ready')
|
||||
|
||||
while True:
|
||||
# Grab the next line of the log
|
||||
line = self.read_line_retry(ping=True)
|
||||
|
||||
if line:
|
||||
self.process(line)
|
||||
else:
|
||||
log.info('Unable to read log file')
|
||||
|
||||
def process(self, line):
|
||||
for parser in self.parsers:
|
||||
if parser.process(line):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def read_line(self):
|
||||
if not self.file:
|
||||
path = self.get_path()
|
||||
if not path:
|
||||
raise Exception('Unable to find the location of "Plex Media Server.log"')
|
||||
|
||||
# Open file
|
||||
self.file = ASIO.open(path, opener=False)
|
||||
self.file.seek(self.file.get_size(), SEEK_ORIGIN_CURRENT)
|
||||
|
||||
# Create buffered reader
|
||||
self.reader = BufferedReader(self.file)
|
||||
|
||||
self.path = self.file.get_path()
|
||||
log.info('Opened file path: "%s"' % self.path)
|
||||
|
||||
return self.reader.readline()
|
||||
|
||||
def read_line_retry(self, timeout=60, ping=False, stale_sleep=1.0):
|
||||
line = None
|
||||
stale_since = None
|
||||
|
||||
while not line:
|
||||
line = self.read_line()
|
||||
|
||||
if line:
|
||||
stale_since = None
|
||||
time.sleep(0.05)
|
||||
break
|
||||
|
||||
if stale_since is None:
|
||||
stale_since = time.time()
|
||||
time.sleep(stale_sleep)
|
||||
continue
|
||||
elif (time.time() - stale_since) > timeout:
|
||||
return None
|
||||
elif (time.time() - stale_since) > timeout / 2:
|
||||
# Nothing returned for 5 seconds
|
||||
if self.file.get_path() != self.path:
|
||||
log.debug("Log file moved (probably rotated), closing")
|
||||
self.close()
|
||||
elif ping:
|
||||
# Ping server to see if server is still active
|
||||
Plex.detail()
|
||||
ping = False
|
||||
|
||||
time.sleep(stale_sleep)
|
||||
|
||||
return line
|
||||
|
||||
def close(self):
|
||||
if not self.file:
|
||||
return
|
||||
|
||||
try:
|
||||
# Close the buffered reader
|
||||
self.reader.close()
|
||||
except Exception as ex:
|
||||
log.error('reader.close() - raised exception: %s', ex, exc_info=True)
|
||||
finally:
|
||||
self.reader = None
|
||||
|
||||
try:
|
||||
# Close the file handle
|
||||
self.file.close()
|
||||
except OSError as ex:
|
||||
if ex.errno == 9:
|
||||
# Bad file descriptor, already closed?
|
||||
log.info('file.close() - ignoring raised exception: %s (already closed)', ex)
|
||||
else:
|
||||
log.error('file.close() - raised exception: %s', ex, exc_info=True)
|
||||
except Exception as ex:
|
||||
log.error('file.close() - raised exception: %s', ex, exc_info=True)
|
||||
finally:
|
||||
self.file = None
|
||||
|
||||
@classmethod
|
||||
def get_path(cls):
|
||||
if cls.path:
|
||||
return cls.path
|
||||
|
||||
hints = cls.get_hints()
|
||||
|
||||
log.debug('hints: %r', hints)
|
||||
|
||||
if not hints:
|
||||
log.error('Unable to find any hints for "%s", operating system not supported', platform.system())
|
||||
return None
|
||||
|
||||
for hint in hints:
|
||||
log.debug('Testing if "%s" exists', hint)
|
||||
|
||||
if os.path.exists(hint):
|
||||
cls.path = hint
|
||||
break
|
||||
|
||||
if cls.path:
|
||||
log.debug('Using the path: %r', cls.path)
|
||||
else:
|
||||
log.error('Unable to find a valid path for "Plex Media Server.log"', extra={
|
||||
'data': {
|
||||
'hints': hints
|
||||
}
|
||||
})
|
||||
|
||||
return cls.path
|
||||
|
||||
@classmethod
|
||||
def add_hint(cls, path, system=None):
|
||||
if system not in cls.path_hints:
|
||||
cls.path_hints[system] = []
|
||||
|
||||
cls.path_hints[system].append(path)
|
||||
|
||||
@classmethod
|
||||
def get_hints(cls):
|
||||
# Retrieve system hints
|
||||
hints_system = PATH_HINTS.get(platform.system(), [])
|
||||
|
||||
# Retrieve global hints
|
||||
hints_global = PATH_HINTS.get(None, [])
|
||||
|
||||
# Retrieve hint from server preferences (if available)
|
||||
data_path = Plex[':/prefs'].get('LocalAppDataPath')
|
||||
|
||||
if data_path:
|
||||
hints_global.append(os.path.join(data_path.value, "Plex Media Server", "Logs", "Plex Media Server.log"))
|
||||
else:
|
||||
log.info('Unable to retrieve "LocalAppDataPath" from server')
|
||||
|
||||
hints = []
|
||||
|
||||
for hint in (hints_global + hints_system):
|
||||
# Resolve hint function
|
||||
if inspect.isfunction(hint):
|
||||
hint = hint()
|
||||
|
||||
# Check for duplicate
|
||||
if hint in hints:
|
||||
continue
|
||||
|
||||
hints.append(hint)
|
||||
|
||||
return hints
|
||||
|
||||
@classmethod
|
||||
def test(cls):
|
||||
# TODO "Logging" source testing
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def register(cls, parser):
|
||||
cls.parsers.append(parser)
|
||||
|
||||
|
||||
Logging.register(NowPlayingParser)
|
||||
Logging.register(ScrobbleParser)
|
@ -1,4 +0,0 @@
|
||||
from plex_activity.sources.s_logging.parsers.now_playing import NowPlayingParser
|
||||
from plex_activity.sources.s_logging.parsers.scrobble import ScrobbleParser
|
||||
|
||||
__all__ = ['NowPlayingParser', 'ScrobbleParser']
|
@ -1,96 +0,0 @@
|
||||
from plex.lib.six.moves import urllib_parse as urlparse
|
||||
from plex_activity.core.helpers import str_format
|
||||
|
||||
from pyemitter import Emitter
|
||||
import logging
|
||||
import re
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
LOG_PATTERN = r'^.*?\[\w+\]\s\w+\s-\s{message}$'
|
||||
REQUEST_HEADER_PATTERN = str_format(LOG_PATTERN, message=r"Request: (\[(?P<address>.*?):(?P<port>\d+)[^]]*\]\s)?{method} {path}.*?")
|
||||
|
||||
IGNORE_PATTERNS = [
|
||||
r'error parsing allowedNetworks.*?',
|
||||
r'Comparing request from.*?',
|
||||
r'(Auth: )?We found auth token (.*?), enabling token-based authentication\.',
|
||||
r'(Auth: )?Came in with a super-token, authorization succeeded\.',
|
||||
r'(Auth: )?Refreshing tokens inside the token-based authentication filter\.',
|
||||
r'\[Now\] Updated play state for .*?',
|
||||
r'Play progress on .*? - got played .*? ms by account .*?!',
|
||||
r'(Statistics: )?\(.*?\) Reporting active playback in state \d+ of type \d+ \(.*?\) for account \d+',
|
||||
r'Request: \[.*?\] (GET|PUT) /video/:/transcode/.*?',
|
||||
r'Received transcode session ping for session .*?'
|
||||
]
|
||||
|
||||
IGNORE_REGEX = re.compile(str_format(LOG_PATTERN, message='(%s)' % ('|'.join('(%s)' % x for x in IGNORE_PATTERNS))), re.IGNORECASE)
|
||||
|
||||
|
||||
PARAM_REGEX = re.compile(str_format(LOG_PATTERN, message=r' \* (?P<key>.*?) =\> (?P<value>.*?)'), re.IGNORECASE)
|
||||
|
||||
|
||||
class Parser(Emitter):
|
||||
def __init__(self, core):
|
||||
self.core = core
|
||||
|
||||
def read_parameters(self, *match_functions):
|
||||
match_functions = [self.parameter_match] + list(match_functions)
|
||||
|
||||
info = {}
|
||||
|
||||
while True:
|
||||
line = self.core.read_line_retry(timeout=5)
|
||||
if not line:
|
||||
log.info('Unable to read log file')
|
||||
return {}
|
||||
|
||||
# Run through each match function to find a result
|
||||
match = None
|
||||
for func in match_functions:
|
||||
match = func(line)
|
||||
|
||||
if match is not None:
|
||||
break
|
||||
|
||||
# Update info dict with result, otherwise finish reading
|
||||
if match:
|
||||
info.update(match)
|
||||
elif match is None and IGNORE_REGEX.match(line.strip()) is None:
|
||||
log.debug('break on "%s"', line.strip())
|
||||
break
|
||||
|
||||
return info
|
||||
|
||||
def process(self, line):
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def parameter_match(line):
|
||||
match = PARAM_REGEX.match(line.strip())
|
||||
if not match:
|
||||
return None
|
||||
|
||||
match = match.groupdict()
|
||||
|
||||
return {match['key']: match['value']}
|
||||
|
||||
@staticmethod
|
||||
def regex_match(regex, line):
|
||||
match = regex.match(line.strip())
|
||||
if not match:
|
||||
return None
|
||||
|
||||
return match.groupdict()
|
||||
|
||||
@staticmethod
|
||||
def query(match, value):
|
||||
if not value:
|
||||
return
|
||||
|
||||
try:
|
||||
parameters = urlparse.parse_qsl(value, strict_parsing=True)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
for key, value in parameters:
|
||||
match.setdefault(key, value)
|
@ -1,116 +0,0 @@
|
||||
from plex_activity.core.helpers import str_format
|
||||
from plex_activity.sources.s_logging.parsers.base import Parser, LOG_PATTERN, REQUEST_HEADER_PATTERN
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PLAYING_HEADER_PATTERN = str_format(REQUEST_HEADER_PATTERN, method="GET", path="/:/(?P<type>timeline|progress)/?(?:\?(?P<query>.*?))?\s")
|
||||
PLAYING_HEADER_REGEX = re.compile(PLAYING_HEADER_PATTERN, re.IGNORECASE)
|
||||
|
||||
RANGE_REGEX = re.compile(str_format(LOG_PATTERN, message=r'Request range: \d+ to \d+'), re.IGNORECASE)
|
||||
CLIENT_REGEX = re.compile(str_format(LOG_PATTERN, message=r'Client \[(?P<machineIdentifier>.*?)\].*?'), re.IGNORECASE)
|
||||
|
||||
NOW_USER_REGEX = re.compile(str_format(LOG_PATTERN, message=r'\[Now\] User is (?P<user_name>.+) \(ID: (?P<user_id>\d+)\)'), re.IGNORECASE)
|
||||
NOW_CLIENT_REGEX = re.compile(str_format(LOG_PATTERN, message=r'\[Now\] Device is (?P<product>.+?) \((?P<client>.+)\)\.'), re.IGNORECASE)
|
||||
|
||||
|
||||
class NowPlayingParser(Parser):
|
||||
required_info = [
|
||||
'ratingKey',
|
||||
'state', 'time'
|
||||
]
|
||||
|
||||
extra_info = [
|
||||
'duration',
|
||||
|
||||
'user_name', 'user_id',
|
||||
'machineIdentifier', 'client'
|
||||
]
|
||||
|
||||
events = [
|
||||
'logging.playing'
|
||||
]
|
||||
|
||||
def __init__(self, main):
|
||||
super(NowPlayingParser, self).__init__(main)
|
||||
|
||||
# Pipe events to the main logging activity instance
|
||||
self.pipe(self.events, main)
|
||||
|
||||
def process(self, line):
|
||||
header_match = PLAYING_HEADER_REGEX.match(line)
|
||||
if not header_match:
|
||||
return False
|
||||
|
||||
activity_type = header_match.group('type')
|
||||
|
||||
# Get a match from the activity entries
|
||||
if activity_type == 'timeline':
|
||||
match = self.timeline()
|
||||
elif activity_type == 'progress':
|
||||
match = self.progress()
|
||||
else:
|
||||
log.warn('Unknown activity type "%s"', activity_type)
|
||||
return True
|
||||
|
||||
print match, activity_type
|
||||
|
||||
if match is None:
|
||||
match = {}
|
||||
|
||||
# Extend match with query info
|
||||
self.query(match, header_match.group('query'))
|
||||
|
||||
# Ensure we successfully matched a result
|
||||
if not match:
|
||||
return True
|
||||
|
||||
# Sanitize the activity result
|
||||
info = {
|
||||
'address': header_match.group('address'),
|
||||
'port': header_match.group('port')
|
||||
}
|
||||
|
||||
# - Get required info parameters
|
||||
for key in self.required_info:
|
||||
if key in match and match[key] is not None:
|
||||
info[key] = match[key]
|
||||
else:
|
||||
log.info('Invalid activity match, missing key %s (matched keys: %s)', key, match.keys())
|
||||
return True
|
||||
|
||||
# - Add in any extra info parameters
|
||||
for key in self.extra_info:
|
||||
if key in match:
|
||||
info[key] = match[key]
|
||||
else:
|
||||
info[key] = None
|
||||
|
||||
# Update the scrobbler with the current state
|
||||
self.emit('logging.playing', info)
|
||||
return True
|
||||
|
||||
def timeline(self):
|
||||
return self.read_parameters(
|
||||
lambda line: self.regex_match(CLIENT_REGEX, line),
|
||||
lambda line: self.regex_match(RANGE_REGEX, line),
|
||||
|
||||
# [Now]* entries
|
||||
lambda line: self.regex_match(NOW_USER_REGEX, line),
|
||||
lambda line: self.regex_match(NOW_CLIENT_REGEX, line),
|
||||
)
|
||||
|
||||
def progress(self):
|
||||
data = self.read_parameters()
|
||||
|
||||
if not data:
|
||||
return {}
|
||||
|
||||
# Translate parameters into timeline-style form
|
||||
return {
|
||||
'state': data.get('state'),
|
||||
'ratingKey': data.get('key'),
|
||||
'time': data.get('time')
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
from plex_activity.core.helpers import str_format
|
||||
from plex_activity.sources.s_logging.parsers.base import Parser, LOG_PATTERN
|
||||
|
||||
import re
|
||||
|
||||
|
||||
class ScrobbleParser(Parser):
|
||||
pattern = str_format(LOG_PATTERN, message=r'Library item (?P<rating_key>\d+) \'(?P<title>.*?)\' got (?P<action>(?:un)?played) by account (?P<account_key>\d+)!.*?')
|
||||
regex = re.compile(pattern, re.IGNORECASE)
|
||||
|
||||
events = [
|
||||
'logging.action.played',
|
||||
'logging.action.unplayed'
|
||||
]
|
||||
|
||||
def __init__(self, main):
|
||||
super(ScrobbleParser, self).__init__(main)
|
||||
|
||||
# Pipe events to the main logging activity instance
|
||||
self.pipe(self.events, main)
|
||||
|
||||
def process(self, line):
|
||||
match = self.regex.match(line)
|
||||
if not match:
|
||||
return False
|
||||
|
||||
action = match.group('action')
|
||||
if not action:
|
||||
return False
|
||||
|
||||
self.emit('logging.action.%s' % action, {
|
||||
'account_key': match.group('account_key'),
|
||||
'rating_key': match.group('rating_key'),
|
||||
|
||||
'title': match.group('title')
|
||||
})
|
||||
|
||||
return True
|
@ -1,3 +0,0 @@
|
||||
from plex_activity.sources.s_websocket.main import WebSocket
|
||||
|
||||
__all__ = ['WebSocket']
|
@ -1,298 +0,0 @@
|
||||
from plex import Plex
|
||||
from plex.lib.six.moves.urllib_parse import urlencode
|
||||
from plex_activity.sources.base import Source
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import websocket
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SCANNING_REGEX = re.compile('Scanning the "(?P<section>.*?)" section', re.IGNORECASE)
|
||||
SCAN_COMPLETE_REGEX = re.compile('Library scan complete', re.IGNORECASE)
|
||||
|
||||
TIMELINE_STATES = {
|
||||
0: 'created',
|
||||
2: 'matching',
|
||||
3: 'downloading',
|
||||
4: 'loading',
|
||||
5: 'finished',
|
||||
6: 'analyzing',
|
||||
9: 'deleted'
|
||||
}
|
||||
|
||||
|
||||
class WebSocket(Source):
|
||||
name = 'websocket'
|
||||
events = [
|
||||
'websocket.playing',
|
||||
|
||||
'websocket.scanner.started',
|
||||
'websocket.scanner.progress',
|
||||
'websocket.scanner.finished',
|
||||
|
||||
'websocket.timeline.created',
|
||||
'websocket.timeline.matching',
|
||||
'websocket.timeline.downloading',
|
||||
'websocket.timeline.loading',
|
||||
'websocket.timeline.finished',
|
||||
'websocket.timeline.analyzing',
|
||||
'websocket.timeline.deleted'
|
||||
]
|
||||
|
||||
opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
|
||||
|
||||
def __init__(self, activity):
|
||||
super(WebSocket, self).__init__()
|
||||
|
||||
self.ws = None
|
||||
self.reconnects = 0
|
||||
|
||||
# Pipe events to the main activity instance
|
||||
self.pipe(self.events, activity)
|
||||
|
||||
def connect(self):
|
||||
uri = 'ws://%s:%s/:/websockets/notifications' % (
|
||||
Plex.configuration.get('server.host', '127.0.0.1'),
|
||||
Plex.configuration.get('server.port', 32400)
|
||||
)
|
||||
|
||||
params = {}
|
||||
|
||||
# Set authentication token (if one is available)
|
||||
if Plex.configuration['authentication.token']:
|
||||
params['X-Plex-Token'] = Plex.configuration['authentication.token']
|
||||
|
||||
# Append parameters to uri
|
||||
if params:
|
||||
uri += '?' + urlencode(params)
|
||||
|
||||
# Create websocket connection
|
||||
self.ws = websocket.create_connection(uri)
|
||||
|
||||
def run(self):
|
||||
self.connect()
|
||||
|
||||
log.debug('Ready')
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.process(*self.receive())
|
||||
|
||||
# successfully received data, reset reconnects counter
|
||||
self.reconnects = 0
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
if self.reconnects <= 5:
|
||||
self.reconnects += 1
|
||||
|
||||
# Increasing sleep interval between reconnections
|
||||
if self.reconnects > 1:
|
||||
time.sleep(2 * (self.reconnects - 1))
|
||||
|
||||
log.info('WebSocket connection has closed, reconnecting...')
|
||||
self.connect()
|
||||
else:
|
||||
log.error('WebSocket connection unavailable, activity monitoring not available')
|
||||
break
|
||||
|
||||
def receive(self):
|
||||
frame = self.ws.recv_frame()
|
||||
|
||||
if not frame:
|
||||
raise websocket.WebSocketException("Not a valid frame %s" % frame)
|
||||
elif frame.opcode in self.opcode_data:
|
||||
return frame.opcode, frame.data
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
|
||||
self.ws.send_close()
|
||||
return frame.opcode, None
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_PING:
|
||||
self.ws.pong("Hi!")
|
||||
|
||||
return None, None
|
||||
|
||||
def process(self, opcode, data):
|
||||
if opcode not in self.opcode_data:
|
||||
return False
|
||||
|
||||
try:
|
||||
info = json.loads(data)
|
||||
except UnicodeDecodeError as ex:
|
||||
log.warn('Error decoding message from websocket: %s' % ex, extra={
|
||||
'event': {
|
||||
'module': __name__,
|
||||
'name': 'process.loads.unicode_decode_error',
|
||||
'key': '%s:%s' % (ex.encoding, ex.reason)
|
||||
}
|
||||
})
|
||||
log.debug(data)
|
||||
return False
|
||||
except Exception as ex:
|
||||
log.warn('Error decoding message from websocket: %s' % ex, extra={
|
||||
'event': {
|
||||
'module': __name__,
|
||||
'name': 'process.load_exception',
|
||||
'key': ex.message
|
||||
}
|
||||
})
|
||||
log.debug(data)
|
||||
return False
|
||||
|
||||
# Handle modern messages (PMS 1.3.0+)
|
||||
if type(info.get('NotificationContainer')) is dict:
|
||||
info = info['NotificationContainer']
|
||||
|
||||
# Process message
|
||||
m_type = info.get('type')
|
||||
|
||||
if not m_type:
|
||||
log.debug('Received message with no "type" parameter: %r', info)
|
||||
return False
|
||||
|
||||
# Pre-process message (if function exists)
|
||||
process_func = getattr(self, 'process_%s' % m_type, None)
|
||||
|
||||
if process_func and process_func(info):
|
||||
return True
|
||||
|
||||
# Emit raw message
|
||||
return self.emit_notification('%s.notification.%s' % (self.name, m_type), info)
|
||||
|
||||
def process_playing(self, info):
|
||||
children = info.get('_children') or info.get('PlaySessionStateNotification')
|
||||
|
||||
if not children:
|
||||
log.debug('Received "playing" message with no children: %r', info)
|
||||
return False
|
||||
|
||||
return self.emit_notification('%s.playing' % self.name, children)
|
||||
|
||||
def process_progress(self, info):
|
||||
children = info.get('_children') or info.get('ProgressNotification')
|
||||
|
||||
if not children:
|
||||
log.debug('Received "progress" message with no children: %r', info)
|
||||
return False
|
||||
|
||||
for notification in children:
|
||||
self.emit('%s.scanner.progress' % self.name, {
|
||||
'message': notification.get('message')
|
||||
})
|
||||
|
||||
return True
|
||||
|
||||
def process_status(self, info):
|
||||
children = info.get('_children') or info.get('StatusNotification')
|
||||
|
||||
if not children:
|
||||
log.debug('Received "status" message with no children: %r', info)
|
||||
return False
|
||||
|
||||
# Process children
|
||||
count = 0
|
||||
|
||||
for notification in children:
|
||||
title = notification.get('title')
|
||||
|
||||
if not title:
|
||||
continue
|
||||
|
||||
# Scan complete message
|
||||
if SCAN_COMPLETE_REGEX.match(title):
|
||||
self.emit('%s.scanner.finished' % self.name)
|
||||
count += 1
|
||||
continue
|
||||
|
||||
# Scanning message
|
||||
match = SCANNING_REGEX.match(title)
|
||||
|
||||
if not match:
|
||||
continue
|
||||
|
||||
section = match.group('section')
|
||||
|
||||
if not section:
|
||||
continue
|
||||
|
||||
self.emit('%s.scanner.started' % self.name, {'section': section})
|
||||
count += 1
|
||||
|
||||
# Validate result
|
||||
if count < 1:
|
||||
log.debug('Received "status" message with no valid children: %r', info)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def process_timeline(self, info):
|
||||
children = info.get('_children') or info.get('TimelineEntry')
|
||||
|
||||
if not children:
|
||||
log.debug('Received "timeline" message with no children: %r', info)
|
||||
return False
|
||||
|
||||
# Process children
|
||||
count = 0
|
||||
|
||||
for entry in children:
|
||||
state = TIMELINE_STATES.get(entry.get('state'))
|
||||
|
||||
if not state:
|
||||
continue
|
||||
|
||||
self.emit('%s.timeline.%s' % (self.name, state), entry)
|
||||
count += 1
|
||||
|
||||
# Validate result
|
||||
if count < 1:
|
||||
log.debug('Received "timeline" message with no valid children: %r', info)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
#
|
||||
# Helpers
|
||||
#
|
||||
|
||||
def emit_notification(self, name, info=None):
|
||||
if info is None:
|
||||
info = {}
|
||||
|
||||
# Emit children
|
||||
children = self._get_children(info)
|
||||
|
||||
if children:
|
||||
for child in children:
|
||||
self.emit(name, child)
|
||||
|
||||
return True
|
||||
|
||||
# Emit objects
|
||||
if info:
|
||||
self.emit(name, info)
|
||||
else:
|
||||
self.emit(name)
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_children(info):
|
||||
if type(info) is list:
|
||||
return info
|
||||
|
||||
if type(info) is not dict:
|
||||
return None
|
||||
|
||||
# Return legacy children
|
||||
if info.get('_children'):
|
||||
return info['_children']
|
||||
|
||||
# Search for modern children container
|
||||
for key, value in info.items():
|
||||
key = key.lower()
|
||||
|
||||
if (key.endswith('entry') or key.endswith('notification')) and type(value) is list:
|
||||
return value
|
||||
|
||||
return None
|
Loading…
Reference in new issue