# -*- coding: utf-8 -*-
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
# This code is licensed under the MIT License.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
import re
import os
import six
from markdown import markdown
from itertools import chain
from . common import NotifyType
from . common import NotifyFormat
from . common import MATCH_ALL_TAG
from . utils import is_exclusive_match
from . utils import parse_list
from . utils import parse_urls
from . logger import logger
from . AppriseAsset import AppriseAsset
from . AppriseConfig import AppriseConfig
from . AppriseAttachment import AppriseAttachment
from . AppriseLocale import AppriseLocale
from . config . ConfigBase import ConfigBase
from . plugins . NotifyBase import NotifyBase
from . import plugins
from . import __version__
# Python v3+ support code made importable so it can remain backwards
# compatible with Python v2
from . import py3compat
class Apprise ( object ) :
Our Notification Manager
def __init__ ( self , servers = None , asset = None , debug = False ) :
Loads a set of server urls while applying the Asset ( ) module to each
if specified .
If no asset is provided , then the default asset is used .
# Initialize a server list of URLs
self . servers = list ( )
# Assigns an central asset object that will be later passed into each
# notification plugin. Assets contain information such as the local
# directory images can be found in. It can also identify remote
# URL paths that contain the images you want to present to the end
# user. If no asset is specified, then the default one is used.
self . asset = \
asset if isinstance ( asset , AppriseAsset ) else AppriseAsset ( )
if servers :
self . add ( servers )
# Initialize our locale object
self . locale = AppriseLocale ( )
# Set our debug flag
self . debug = debug
def instantiate ( url , asset = None , tag = None , suppress_exceptions = True ) :
Returns the instance of a instantiated plugin based on the provided
Server URL . If the url fails to be parsed , then None is returned .
The specified url can be either a string ( the URL itself ) or a
dictionary containing all of the components needed to istantiate
the notification service . If identifying a dictionary , at the bare
minimum , one must specify the schema .
An example of a url dictionary object might look like :
schema : ' mailto ' ,
host : ' google.com ' ,
user : ' myuser ' ,
password : ' mypassword ' ,
Alternatively the string is much easier to specify :
mailto : / / user : mypassword @google.com
The dictionary works well for people who are calling details ( ) to
extract the components they need to build the URL manually .
# Initialize our result set
results = None
if isinstance ( url , six . string_types ) :
# Acquire our url tokens
results = plugins . url_to_dict ( url )
if results is None :
# Failed to parse the server URL; detailed logging handled
# inside url_to_dict - nothing to report here.
return None
elif isinstance ( url , dict ) :
# We already have our result set
results = url
if results . get ( ' schema ' ) not in plugins . SCHEMA_MAP :
# schema is a mandatory dictionary item as it is the only way
# we can index into our loaded plugins
logger . error ( ' Dictionary does not include a " schema " entry. ' )
logger . trace ( ' Invalid dictionary unpacked as: {} {} ' . format (
os . linesep , os . linesep . join (
[ ' {} = " {} " ' . format ( k , v ) for k , v in results . items ( ) ] ) ) )
return None
logger . trace ( ' Dictionary unpacked as: {} {} ' . format (
os . linesep , os . linesep . join (
[ ' {} = " {} " ' . format ( k , v ) for k , v in results . items ( ) ] ) ) )
else :
logger . error ( ' Invalid URL specified: {} ' . format ( url ) )
return None
# Build a list of tags to associate with the newly added notifications
results [ ' tag ' ] = set ( parse_list ( tag ) )
# Prepare our Asset Object
results [ ' asset ' ] = \
asset if isinstance ( asset , AppriseAsset ) else AppriseAsset ( )
if suppress_exceptions :
try :
# Attempt to create an instance of our plugin using the parsed
# URL information
plugin = plugins . SCHEMA_MAP [ results [ ' schema ' ] ] ( * * results )
# Create log entry of loaded URL
logger . debug ( ' Loaded {} URL: {} ' . format (
plugins . SCHEMA_MAP [ results [ ' schema ' ] ] . service_name ,
plugin . url ( ) ) )
except Exception :
# the arguments are invalid or can not be used.
logger . error ( ' Could not load {} URL: {} ' . format (
plugins . SCHEMA_MAP [ results [ ' schema ' ] ] . service_name , url ) )
return None
else :
# Attempt to create an instance of our plugin using the parsed
# URL information but don't wrap it in a try catch
plugin = plugins . SCHEMA_MAP [ results [ ' schema ' ] ] ( * * results )
return plugin
def add ( self , servers , asset = None , tag = None ) :
Adds one or more server URLs into our list .
You can override the global asset if you wish by including it with the
server ( s ) that you add .
The tag allows you to associate 1 or more tag values to the server ( s )
being added . tagging a service allows you to exclusively access them
when calling the notify ( ) function .
# Initialize our return status
return_status = True
if asset is None :
# prepare default asset
asset = self . asset
if isinstance ( servers , six . string_types ) :
# build our server list
servers = parse_urls ( servers )
if len ( servers ) == 0 :
return False
elif isinstance ( servers , dict ) :
# no problem, we support kwargs, convert it to a list
servers = [ servers ]
elif isinstance ( servers , ( ConfigBase , NotifyBase , AppriseConfig ) ) :
# Go ahead and just add our plugin into our list
self . servers . append ( servers )
return True
elif not isinstance ( servers , ( tuple , set , list ) ) :
logger . error (
" An invalid notification (type= {} ) was specified. " . format (
type ( servers ) ) )
return False
for _server in servers :
if isinstance ( _server , ( ConfigBase , NotifyBase , AppriseConfig ) ) :
# Go ahead and just add our plugin into our list
self . servers . append ( _server )
elif not isinstance ( _server , ( six . string_types , dict ) ) :
logger . error (
" An invalid notification (type= {} ) was specified. " . format (
type ( _server ) ) )
return_status = False
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = Apprise . instantiate ( _server , asset = asset , tag = tag )
if not isinstance ( instance , NotifyBase ) :
# No logging is required as instantiate() handles failure
# and/or success reasons for us
return_status = False
# Add our initialized plugin to our server listings
self . servers . append ( instance )
# Return our status
return return_status
def clear ( self ) :
Empties our server list
self . servers [ : ] = [ ]
def find ( self , tag = MATCH_ALL_TAG ) :
Returns an list of all servers matching against the tag specified .
# Build our tag setup
# - top level entries are treated as an 'or'
# - second level (or more) entries are treated as 'and'
# examples:
# tag="tagA, tagB" = tagA or tagB
# tag=['tagA', 'tagB'] = tagA or tagB
# tag=[('tagA', 'tagC'), 'tagB'] = (tagA and tagC) or tagB
# tag=[('tagB', 'tagC')] = tagB and tagC
# Iterate over our loaded plugins
for entry in self . servers :
if isinstance ( entry , ( ConfigBase , AppriseConfig ) ) :
# load our servers
servers = entry . servers ( )
else :
servers = [ entry , ]
for server in servers :
# Apply our tag matching based on our defined logic
if is_exclusive_match (
logic = tag , data = server . tags , match_all = MATCH_ALL_TAG ) :
yield server
def notify ( self , body , title = ' ' , notify_type = NotifyType . INFO ,
body_format = None , tag = MATCH_ALL_TAG , attach = None ) :
Send a notification to all of the plugins previously loaded .
If the body_format specified is NotifyFormat . MARKDOWN , it will
be converted to HTML if the Notification type expects this .
if the tag is specified ( either a string or a set / list / tuple
of strings ) , then only the notifications flagged with that
tagged value are notified . By default all added services
are notified ( tag = MATCH_ALL_TAG )
This function returns True if all notifications were successfully
sent , False if even just one of them fails , and None if no
notifications were sent at all as a result of tag filtering and / or
simply having empty configuration files that were read .
Attach can contain a list of attachment URLs . attach can also be
represented by a an AttachBase ( ) ( or list of ) object ( s ) . This
identifies the products you wish to notify
if len ( self ) == 0 :
# Nothing to notify
return False
# Initialize our return result which only turns to True if we send
# at least one valid notification
status = None
if not ( title or body ) :
return False
# Tracks conversions
conversion_map = dict ( )
# Prepare attachments if required
if attach is not None and not isinstance ( attach , AppriseAttachment ) :
try :
attach = AppriseAttachment ( attach , asset = self . asset )
except TypeError :
# bad attachments
return False
# Allow Asset default value
body_format = self . asset . body_format \
if body_format is None else body_format
# for asyncio support; we track a list of our servers to notify
# sequentially
coroutines = [ ]
# Iterate over our loaded plugins
for server in self . find ( tag ) :
if status is None :
# We have at least one server to notify; change status
# to be a default value of True from now (purely an
# initialiation at this point)
status = True
# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
if server . notify_format not in conversion_map :
if body_format == NotifyFormat . MARKDOWN and \
server . notify_format == NotifyFormat . HTML :
# Apply Markdown
conversion_map [ server . notify_format ] = markdown ( body )
elif body_format == NotifyFormat . TEXT and \
server . notify_format == NotifyFormat . HTML :
# Basic TEXT to HTML format map; supports keys only
re_map = {
# Support Ampersand
r ' & ' : ' & ' ,
# Spaces to for formatting purposes since
# multiple spaces are treated as one an this may
# not be the callers intention
r ' ' : ' ' ,
# Tab support
r ' \ t ' : ' ' ,
# Greater than and Less than Characters
r ' > ' : ' > ' ,
r ' < ' : ' < ' ,
# Compile our map
re_table = re . compile (
r ' ( ' + ' | ' . join (
map ( re . escape , re_map . keys ( ) ) ) + r ' ) ' ,
# Execute our map against our body in addition to
# swapping out new lines and replacing them with <br/>
conversion_map [ server . notify_format ] = \
re . sub ( r ' \ r* \ n ' , ' <br/> \r \n ' ,
re_table . sub (
lambda x : re_map [ x . group ( ) ] , body ) )
else :
# Store entry directly
conversion_map [ server . notify_format ] = body
if ASYNCIO_SUPPORT and server . asset . async_mode :
# Build a list of servers requiring notification
# that will be triggered asynchronously afterwards
coroutines . append ( server . async_notify (
body = conversion_map [ server . notify_format ] ,
title = title ,
notify_type = notify_type ,
attach = attach ) )
# We gather at this point and notify at the end
try :
# Send notification
if not server . notify (
body = conversion_map [ server . notify_format ] ,
title = title ,
notify_type = notify_type ,
attach = attach ) :
# Toggle our return status flag
status = False
except TypeError :
# These our our internally thrown notifications
status = False
except Exception :
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger . exception ( " Notification Exception " )
status = False
if coroutines :
# perform our async notification(s)
if not py3compat . asyncio . notify ( coroutines , debug = self . debug ) :
# Toggle our status only if we had a failure
status = False
return status
def details ( self , lang = None ) :
Returns the details associated with the Apprise object
# general object returned
response = {
# Defines the current version of Apprise
' version ' : __version__ ,
# Lists all of the currently supported Notifications
' schemas ' : [ ] ,
# Includes the configured asset details
' asset ' : self . asset . details ( ) ,
# to add it's mapping to our hash table
for plugin in set ( plugins . SCHEMA_MAP . values ( ) ) :
# Standard protocol(s) should be None or a tuple
protocols = getattr ( plugin , ' protocol ' , None )
if isinstance ( protocols , six . string_types ) :
protocols = ( protocols , )
# Secure protocol(s) should be None or a tuple
secure_protocols = getattr ( plugin , ' secure_protocol ' , None )
if isinstance ( secure_protocols , six . string_types ) :
secure_protocols = ( secure_protocols , )
if not lang :
# Simply return our results
details = plugins . details ( plugin )
else :
# Emulate the specified language when returning our results
with self . locale . lang_at ( lang ) :
details = plugins . details ( plugin )
# Build our response object
response [ ' schemas ' ] . append ( {
' service_name ' : getattr ( plugin , ' service_name ' , None ) ,
' service_url ' : getattr ( plugin , ' service_url ' , None ) ,
' setup_url ' : getattr ( plugin , ' setup_url ' , None ) ,
' protocols ' : protocols ,
' secure_protocols ' : secure_protocols ,
' details ' : details ,
} )
return response
def urls ( self ) :
Returns all of the loaded URLs defined in this apprise object .
return [ x . url ( ) for x in self . servers ]
def pop ( self , index ) :
Removes an indexed Notification Service from the stack and returns it .
The thing is we can never pop AppriseConfig ( ) entries , only what was
loaded within them . So pop needs to carefully iterate over our list
and only track actual entries .
# Tracking variables
prev_offset = - 1
offset = prev_offset
for idx , s in enumerate ( self . servers ) :
if isinstance ( s , ( ConfigBase , AppriseConfig ) ) :
servers = s . servers ( )
if len ( servers ) > 0 :
# Acquire a new maximum offset to work with
offset = prev_offset + len ( servers )
if offset > = index :
# we can pop an element from our config stack
fn = s . pop if isinstance ( s , ConfigBase ) \
else s . server_pop
return fn ( index if prev_offset == - 1
else ( index - prev_offset - 1 ) )
else :
offset = prev_offset + 1
if offset == index :
return self . servers . pop ( idx )
# Update our old offset
prev_offset = offset
# If we reach here, then we indexed out of range
raise IndexError ( ' list index out of range ' )
def __getitem__ ( self , index ) :
Returns the indexed server entry of a loaded notification server
# Tracking variables
prev_offset = - 1
offset = prev_offset
for idx , s in enumerate ( self . servers ) :
if isinstance ( s , ( ConfigBase , AppriseConfig ) ) :
# Get our list of servers associate with our config object
servers = s . servers ( )
if len ( servers ) > 0 :
# Acquire a new maximum offset to work with
offset = prev_offset + len ( servers )
if offset > = index :
return servers [ index if prev_offset == - 1
else ( index - prev_offset - 1 ) ]
else :
offset = prev_offset + 1
if offset == index :
return self . servers [ idx ]
# Update our old offset
prev_offset = offset
# If we reach here, then we indexed out of range
raise IndexError ( ' list index out of range ' )
def __bool__ ( self ) :
Allows the Apprise object to be wrapped in an Python 3. x based ' if
statement ' . True is returned if at least one service has been loaded.
return len ( self ) > 0
def __nonzero__ ( self ) :
Allows the Apprise object to be wrapped in an Python 2. x based ' if
statement ' . True is returned if at least one service has been loaded.
return len ( self ) > 0
def __iter__ ( self ) :
Returns an iterator to each of our servers loaded . This includes those
found inside configuration .
return chain ( * [ [ s ] if not isinstance ( s , ( ConfigBase , AppriseConfig ) )
else iter ( s . servers ( ) ) for s in self . servers ] )
def __len__ ( self ) :
Returns the number of servers loaded ; this includes those found within
loaded configuration . This funtion nnever actually counts the
Config entry themselves ( if they exist ) , only what they contain .
return sum ( [ 1 if not isinstance ( s , ( ConfigBase , AppriseConfig ) )
else len ( s . servers ( ) ) for s in self . servers ] )