You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/apprise/manager.py

750 lines
26 KiB

# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import re
import sys
import time
import hashlib
import inspect
import threading
from .utils import import_module
from .utils import Singleton
from .utils import parse_list
from os.path import dirname
from os.path import abspath
from os.path import join
from .logger import logger
class PluginManager(metaclass=Singleton):
"""
Designed to be a singleton object to maintain all initialized loading
of modules in memory.
"""
# Description (used for logging)
name = 'Singleton Plugin'
# Memory Space
_id = 'undefined'
# Our Module Python path name
module_name_prefix = f'apprise.{_id}'
# The module path to scan
module_path = join(abspath(dirname(__file__)), _id)
# thread safe loading
_lock = threading.Lock()
def __init__(self, *args, **kwargs):
"""
Over-ride our class instantiation to provide a singleton
"""
self._module_map = None
self._schema_map = None
# This contains a mapping of all plugins dynamicaly loaded at runtime
# from external modules such as the @notify decorator
#
# The elements here will be additionally added to the _schema_map if
# there is no conflict otherwise.
# The structure looks like the following:
# Module path, e.g. /usr/share/apprise/plugins/my_notify_hook.py
# {
# 'path': path,
#
# 'notify': {
# 'schema': {
# 'name': 'Custom schema name',
# 'fn_name': 'name_of_function_decorator_was_found_on',
# 'url': 'schema://any/additional/info/found/on/url'
# 'plugin': <CustomNotifyWrapperPlugin>
# },
# 'schema2': {
# 'name': 'Custom schema name',
# 'fn_name': 'name_of_function_decorator_was_found_on',
# 'url': 'schema://any/additional/info/found/on/url'
# 'plugin': <CustomNotifyWrapperPlugin>
# }
# }
# Note: that the <CustomNotifyWrapperPlugin> inherits from
# NotifyBase
self._custom_module_map = {}
# Track manually disabled modules (by their schema)
self._disabled = set()
# Hash of all paths previously scanned so we don't waste
# effort/overhead doing it again
self._paths_previously_scanned = set()
# Track loaded module paths to prevent from loading them again
self._loaded = set()
def unload_modules(self, disable_native=False):
"""
Reset our object and unload all modules
"""
with self._lock:
if self._custom_module_map:
# Handle Custom Module Assignments
for meta in self._custom_module_map.values():
if meta['name'] not in self._module_map:
# Nothing to remove
continue
# For the purpose of tidying up un-used modules in memory
loaded = [m for m in sys.modules.keys()
if m.startswith(
self._module_map[meta['name']]['path'])]
for module_path in loaded:
del sys.modules[module_path]
# Reset disabled plugins (if any)
for schema in self._disabled:
self._schema_map[schema].enabled = True
self._disabled.clear()
# Reset our variables
self._schema_map = {}
self._custom_module_map = {}
if disable_native:
self._module_map = {}
else:
self._module_map = None
self._loaded = set()
# Reset our path cache
self._paths_previously_scanned = set()
def load_modules(self, path=None, name=None, force=False):
"""
Load our modules into memory
"""
# Default value
module_name_prefix = self.module_name_prefix if name is None else name
module_path = self.module_path if path is None else path
with self._lock:
if not force and module_path in self._loaded:
# We're done
return
# Our base reference
module_count = len(self._module_map) if self._module_map else 0
schema_count = len(self._schema_map) if self._schema_map else 0
if not self:
# Initialize our maps
self._module_map = {}
self._schema_map = {}
self._custom_module_map = {}
# Used for the detection of additional Notify Services objects
# The .py extension is optional as we support loading directories
# too
module_re = re.compile(
r'^(?P<name>' + self.fname_prefix + r'[a-z0-9]+)(\.py)?$',
re.I)
t_start = time.time()
for f in os.listdir(module_path):
tl_start = time.time()
match = module_re.match(f)
if not match:
# keep going
continue
elif match.group('name') == f'{self.fname_prefix}Base':
# keep going
continue
# Store our notification/plugin name:
module_name = match.group('name')
module_pyname = '{}.{}'.format(module_name_prefix, module_name)
if module_name in self._module_map:
logger.warning(
"%s(s) (%s) already loaded; ignoring %s",
self.name, module_name, os.path.join(module_path, f))
continue
try:
module = __import__(
module_pyname,
globals(), locals(),
fromlist=[module_name])
except ImportError:
# No problem, we can try again another way...
module = import_module(
os.path.join(module_path, f), module_pyname)
if not module:
# logging found in import_module and not needed here
continue
if not hasattr(module, module_name):
# Not a library we can load as it doesn't follow the simple
# rule that the class must bear the same name as the
# notification file itself.
logger.trace(
"%s (%s) import failed; no filename/Class "
"match found in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Get our plugin
plugin = getattr(module, module_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
logger.trace(
"(%s) import failed; no app_id defined in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Add our plugin name to our module map
self._module_map[module_name] = {
'plugin': set([plugin]),
'module': module,
'path': '{}.{}'.format(module_name_prefix, module_name),
'native': True,
}
fn = getattr(plugin, 'schemas', None)
schemas = set([]) if not callable(fn) else fn(plugin)
# map our schema to our plugin
for schema in schemas:
if schema in self._schema_map:
logger.error(
"{} schema ({}) mismatch detected - {} to {}"
.format(self.name, schema, self._schema_map,
plugin))
continue
# Assign plugin
self._schema_map[schema] = plugin
logger.trace(
'{} {} loaded in {:.6f}s'.format(
self.name, module_name, (time.time() - tl_start)))
# Track the directory loaded so we never load it again
self._loaded.add(module_path)
logger.debug(
'{} {}(s) and {} Schema(s) loaded in {:.4f}s'
.format(
self.name,
len(self._module_map) - module_count,
len(self._schema_map) - schema_count,
(time.time() - t_start)))
def module_detection(self, paths, cache=True):
"""
Leverage the @notify decorator and load all objects found matching
this.
"""
# A simple restriction that we don't allow periods in the filename at
# all so it can't be hidden (Linux OS's) and it won't conflict with
# Python path naming. This also prevents us from loading any python
# file that starts with an underscore or dash
# We allow for __init__.py as well
module_re = re.compile(
r'^(?P<name>[_a-z0-9][a-z0-9._-]+)?(\.py)?$', re.I)
# Validate if we're a loadable Python file or not
valid_python_file_re = re.compile(r'.+\.py(o|c)?$', re.IGNORECASE)
if isinstance(paths, str):
paths = [paths, ]
if not paths or not isinstance(paths, (tuple, list)):
# We're done
return
def _import_module(path):
# Since our plugin name can conflict (as a module) with another
# we want to generate random strings to avoid steping on
# another's namespace
if not (path and valid_python_file_re.match(path)):
# Ignore file/module type
logger.trace('Plugin Scan: Skipping %s', path)
return
t_start = time.time()
module_name = hashlib.sha1(path.encode('utf-8')).hexdigest()
module_pyname = "{prefix}.{name}".format(
prefix='apprise.custom.module', name=module_name)
if module_pyname in self._custom_module_map:
# First clear out existing entries
for schema in \
self._custom_module_map[module_pyname]['notify']\
.keys():
# Remove any mapped modules to this file
del self._schema_map[schema]
# Reset
del self._custom_module_map[module_pyname]
# Load our module
module = import_module(path, module_pyname)
if not module:
# No problem, we can't use this object
logger.warning('Failed to load custom module: %s', _path)
return
# Print our loaded modules if any
if module_pyname in self._custom_module_map:
logger.debug(
'Custom module %s - %d schema(s) (name=%s) '
'loaded in %.6fs', _path,
len(self._custom_module_map[module_pyname]['notify']),
module_name, (time.time() - t_start))
# Add our plugin name to our module map
self._module_map[module_name] = {
'plugin': set(),
'module': module,
'path': module_pyname,
'native': False,
}
for schema, meta in\
self._custom_module_map[module_pyname]['notify']\
.items():
# For mapping purposes; map our element in our main list
self._module_map[module_name]['plugin'].add(
self._schema_map[schema])
# Log our success
logger.info('Loaded custom notification: %s://', schema)
else:
# The code reaches here if we successfully loaded the Python
# module but no hooks/triggers were found. So we can safely
# just remove/ignore this entry
del sys.modules[module_pyname]
return
# end of _import_module()
return
for _path in paths:
path = os.path.abspath(os.path.expanduser(_path))
if (cache and path in self._paths_previously_scanned) \
or not os.path.exists(path):
# We're done as we've already scanned this
continue
# Store our path as a way of hashing it has been handled
self._paths_previously_scanned.add(path)
if os.path.isdir(path) and not \
os.path.isfile(os.path.join(path, '__init__.py')):
logger.debug('Scanning for custom plugins in: %s', path)
for entry in os.listdir(path):
re_match = module_re.match(entry)
if not re_match:
# keep going
logger.trace('Plugin Scan: Ignoring %s', entry)
continue
new_path = os.path.join(path, entry)
if os.path.isdir(new_path):
# Update our path
new_path = os.path.join(path, entry, '__init__.py')
if not os.path.isfile(new_path):
logger.trace(
'Plugin Scan: Ignoring %s',
os.path.join(path, entry))
continue
if not cache or \
(cache and new_path not in
self._paths_previously_scanned):
# Load our module
_import_module(new_path)
# Add our subdir path
self._paths_previously_scanned.add(new_path)
else:
if os.path.isdir(path):
# This logic is safe to apply because we already
# validated the directories state above; update our
# path
path = os.path.join(path, '__init__.py')
if cache and path in self._paths_previously_scanned:
continue
self._paths_previously_scanned.add(path)
# directly load as is
re_match = module_re.match(os.path.basename(path))
# must be a match and must have a .py extension
if not re_match or not re_match.group(1):
# keep going
logger.trace('Plugin Scan: Ignoring %s', path)
continue
# Load our module
_import_module(path)
return None
def add(self, plugin, schemas=None, url=None, send_func=None):
"""
Ability to manually add Notification services to our stack
"""
if not self:
# Lazy load
self.load_modules()
# Acquire a list of schemas
p_schemas = parse_list(plugin.secure_protocol, plugin.protocol)
if isinstance(schemas, str):
schemas = [schemas, ]
elif schemas is None:
# Default
schemas = p_schemas
if not schemas or not isinstance(schemas, (set, tuple, list)):
# We're done
logger.error(
'The schemas provided (type %s) is unsupported; '
'loaded from %s.',
type(schemas),
send_func.__name__ if send_func else plugin.__class__.__name__)
return False
# Convert our schemas into a set
schemas = set([s.lower() for s in schemas]) | set(p_schemas)
# Valdation
conflict = [s for s in schemas if s in self]
if conflict:
# we're already handling this schema
logger.warning(
'The schema(s) (%s) are already defined and could not be '
'loaded from %s%s.',
', '.join(conflict),
'custom notify function ' if send_func else '',
send_func.__name__ if send_func else plugin.__class__.__name__)
return False
if send_func:
# Acquire the function name
fn_name = send_func.__name__
# Acquire the python filename path
path = inspect.getfile(send_func)
# Acquire our path to our module
module_name = str(send_func.__module__)
if module_name not in self._custom_module_map:
# Support non-dynamic includes as well...
self._custom_module_map[module_name] = {
# Name can be useful for indexing back into the
# _module_map object; this is the key to do it with:
'name': module_name.split('.')[-1],
# The path to the module loaded
'path': path,
# Initialize our template
'notify': {},
}
for schema in schemas:
self._custom_module_map[module_name]['notify'][schema] = {
# The name of the send function the @notify decorator
# wrapped
'fn_name': fn_name,
# The URL that was provided in the @notify decorator call
# associated with the 'on='
'url': url,
}
else:
module_name = hashlib.sha1(
''.join(schemas).encode('utf-8')).hexdigest()
module_pyname = "{prefix}.{name}".format(
prefix='apprise.adhoc.module', name=module_name)
# Add our plugin name to our module map
self._module_map[module_name] = {
'plugin': set([plugin]),
'module': None,
'path': module_pyname,
'native': False,
}
for schema in schemas:
# Assign our mapping
self._schema_map[schema] = plugin
return True
def remove(self, *schemas):
"""
Removes a loaded element (if defined)
"""
if not self:
# Lazy load
self.load_modules()
for schema in schemas:
try:
del self[schema]
except KeyError:
pass
def plugins(self, include_disabled=True):
"""
Return all of our loaded plugins
"""
if not self:
# Lazy load
self.load_modules()
for module in self._module_map.values():
for plugin in module['plugin']:
if not include_disabled and not plugin.enabled:
continue
yield plugin
def schemas(self, include_disabled=True):
"""
Return all of our loaded schemas
if include_disabled == True, then even disabled notifications are
returned
"""
if not self:
# Lazy load
self.load_modules()
# Return our list
return list(self._schema_map.keys()) if include_disabled else \
[s for s in self._schema_map.keys() if self._schema_map[s].enabled]
def disable(self, *schemas):
"""
Disables the modules associated with the specified schemas
"""
if not self:
# Lazy load
self.load_modules()
for schema in schemas:
if schema not in self._schema_map:
continue
if not self._schema_map[schema].enabled:
continue
# Disable
self._schema_map[schema].enabled = False
self._disabled.add(schema)
def enable_only(self, *schemas):
"""
Disables the modules associated with the specified schemas
"""
if not self:
# Lazy load
self.load_modules()
# convert to set for faster indexing
schemas = set(schemas)
for plugin in self.plugins():
# Get our plugin's schema list
p_schemas = set(
parse_list(plugin.secure_protocol, plugin.protocol))
if not schemas & p_schemas:
if plugin.enabled:
# Disable it (only if previously enabled); this prevents us
# from adjusting schemas that were disabled due to missing
# libraries or other environment reasons
plugin.enabled = False
self._disabled |= p_schemas
continue
# If we reach here, our schema was flagged to be enabled
if p_schemas & self._disabled:
# Previously disabled; no worries, let's clear this up
self._disabled -= p_schemas
plugin.enabled = True
def __contains__(self, schema):
"""
Checks if a schema exists
"""
if not self:
# Lazy load
self.load_modules()
return schema in self._schema_map
def __delitem__(self, schema):
if not self:
# Lazy load
self.load_modules()
# Get our plugin (otherwise we throw a KeyError) which is
# intended on del action that doesn't align
plugin = self._schema_map[schema]
# Our list of all schema entries
p_schemas = set([schema])
for key in list(self._module_map.keys()):
if plugin in self._module_map[key]['plugin']:
# Remove our plugin
self._module_map[key]['plugin'].remove(plugin)
# Custom Plugin Entry; Clean up cross reference
module_pyname = self._module_map[key]['path']
if not self._module_map[key]['native'] and \
module_pyname in self._custom_module_map:
del self.\
_custom_module_map[module_pyname]['notify'][schema]
if not self._custom_module_map[module_pyname]['notify']:
#
# Last custom loaded element
#
# Free up custom object entry
del self._custom_module_map[module_pyname]
if not self._module_map[key]['plugin']:
#
# Last element
#
if self._module_map[key]['native']:
# Get our plugin's schema list
p_schemas = \
set([s for s in parse_list(
plugin.secure_protocol, plugin.protocol)
if s in self._schema_map])
# free system memory
if self._module_map[key]['module']:
del sys.modules[self._module_map[key]['path']]
# free last remaining pointer in module map
del self._module_map[key]
for schema in p_schemas:
# Final Tidy
del self._schema_map[schema]
def __setitem__(self, schema, plugin):
"""
Support fast assigning of Plugin/Notification Objects
"""
if not self:
# Lazy load
self.load_modules()
# Set default values if not otherwise set
if not plugin.service_name:
# Assign service name if one doesn't exist
plugin.service_name = f'{schema}://'
p_schemas = set(
parse_list(plugin.secure_protocol, plugin.protocol))
if not p_schemas:
# Assign our protocol
plugin.secure_protocol = schema
p_schemas.add(schema)
elif schema not in p_schemas:
# Add our others (if defined)
plugin.secure_protocol = \
set([schema] + parse_list(plugin.secure_protocol))
p_schemas.add(schema)
if not self.add(plugin, schemas=p_schemas):
raise KeyError('Conflicting Assignment')
def __getitem__(self, schema):
"""
Returns the indexed plugin identified by the schema specified
"""
if not self:
# Lazy load
self.load_modules()
return self._schema_map[schema]
def __iter__(self):
"""
Returns an iterator so we can iterate over our loaded modules
"""
if not self:
# Lazy load
self.load_modules()
return iter(self._module_map.values())
def __len__(self):
"""
Returns the number of modules/plugins loaded
"""
if not self:
# Lazy load
self.load_modules()
return len(self._module_map)
def __bool__(self):
"""
Determines if object has loaded or not
"""
return True if self._loaded and self._module_map is not None else False