# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# All of the documentation needed to work with the Twist API can be found
# here: https://developer.twist.com/v3/
import re
import requests
from json import loads
from itertools import chain
from . NotifyBase import NotifyBase
from . . URLBase import PrivacyMode
from . . common import NotifyFormat
from . . common import NotifyType
from . . utils import parse_list
from . . utils import is_email
from . . AppriseLocale import gettext_lazy as _
# A workspace can also be interpreted as a team name too!
IS_CHANNEL = re . compile (
r ' ^#?(?P<name>((?P<workspace>[A-Za-z0-9_-]+):)? '
r ' (?P<channel>[^ \ s] { 1,64}))$ ' )
IS_CHANNEL_ID = re . compile (
r ' ^(?P<name>((?P<workspace>[0-9]+):)?(?P<channel>[0-9]+))$ ' )
# Used to break apart list of potential tags by their delimiter
# into a usable list.
LIST_DELIM = re . compile ( r ' [ \ t \ r \ n, \\ /]+ ' )
class NotifyTwist ( NotifyBase ) :
"""
A wrapper for Notify Twist Notifications
"""
# The default descriptive name associated with the Notification
service_name = ' Twist '
# The services URL
service_url = ' https://twist.com '
# The default secure protocol
secure_protocol = ' twist '
# A URL that takes you to the setup/help of the specific protocol
setup_url = ' https://github.com/caronc/apprise/wiki/Notify_twist '
# The maximum size of the message
body_maxlen = 1000
# Default to markdown
notify_format = NotifyFormat . MARKDOWN
# The default Notification URL to use
api_url = ' https://api.twist.com/api/v3/ '
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.2
# The default channel to notify if no targets are specified
default_notification_channel = ' general '
# Define object templates
templates = (
' {schema} :// {password} : {email} ' ,
' {schema} :// {password} : {email} / {targets} ' ,
)
# Define our template arguments
template_tokens = dict ( NotifyBase . template_tokens , * * {
' password ' : {
' name ' : _ ( ' Password ' ) ,
' type ' : ' string ' ,
' private ' : True ,
} ,
' email ' : {
' name ' : _ ( ' Email ' ) ,
' type ' : ' string ' ,
} ,
' target_channel ' : {
' name ' : _ ( ' Target Channel ' ) ,
' type ' : ' string ' ,
' prefix ' : ' # ' ,
' map_to ' : ' targets ' ,
} ,
' target_channel_id ' : {
' name ' : _ ( ' Target Channel ID ' ) ,
' type ' : ' string ' ,
' map_to ' : ' targets ' ,
} ,
' targets ' : {
' name ' : _ ( ' Targets ' ) ,
' type ' : ' list:string ' ,
} ,
} )
# Define our template arguments
template_args = dict ( NotifyBase . template_args , * * {
' to ' : {
' alias_of ' : ' targets ' ,
} ,
} )
def __init__ ( self , email = None , targets = None , * * kwargs ) :
"""
Initialize Notify Twist Object
"""
super ( NotifyTwist , self ) . __init__ ( * * kwargs )
# Initialize channels list
self . channels = set ( )
# Initialize Channel ID which are stored as:
# <workspace_id>:<channel_id>
self . channel_ids = set ( )
# The token is None if we're not logged in and False if we
# failed to log in. Otherwise it is set to the actual token
self . token = None
# Our default workspace (associated with our token)
self . default_workspace = None
# A set of all of the available workspaces
self . _cached_workspaces = set ( )
# A mapping of channel names, the layout is as follows:
# {
# <workspace_id>: {
# <channel_name>: <channel_id>,
# <channel_name>: <channel_id>,
# ...
# },
# <workspace2_id>: {
# <channel_name>: <channel_id>,
# <channel_name>: <channel_id>,
# ...
# },
# }
self . _cached_channels = dict ( )
# Initialize our Email Object
self . email = email if email else ' {} @ {} ' . format (
self . user ,
self . host ,
)
# Check if it is valid
result = is_email ( self . email )
if not result :
# let outer exception handle this
msg = ' The Twist Auth email specified ( {} ) is invalid. ' \
. format ( self . email )
self . logger . warning ( msg )
raise TypeError ( msg )
# Re-assign email based on what was parsed
self . email = result [ ' full_email ' ]
if email :
# Force user/host to be that of the defined email for
# consistency. This is very important for those initializing
# this object with the the email object would could potentially
# cause inconsistency to contents in the NotifyBase() object
self . user = result [ ' user ' ]
self . host = result [ ' domain ' ]
if not self . password :
msg = ' No Twist password was specified with account: {} ' \
. format ( self . email )
self . logger . warning ( msg )
raise TypeError ( msg )
# Validate recipients and drop bad ones:
for recipient in parse_list ( targets ) :
result = IS_CHANNEL_ID . match ( recipient )
if result :
# store valid channel id
self . channel_ids . add ( result . group ( ' name ' ) )
continue
result = IS_CHANNEL . match ( recipient )
if result :
# store valid device
self . channels . add ( result . group ( ' name ' ) . lower ( ) )
continue
self . logger . warning (
' Dropped invalid channel/id '
' ( {} ) specified. ' . format ( recipient ) ,
)
if len ( self . channels ) + len ( self . channel_ids ) == 0 :
# Notify our default channel
self . channels . add ( self . default_notification_channel )
self . logger . warning (
' Added default notification channel {} ' . format (
self . default_notification_channel ) )
return
def url ( self , privacy = False , * args , * * kwargs ) :
"""
Returns the URL built dynamically based on specified arguments .
"""
# Our URL parameters
params = self . url_parameters ( privacy = privacy , * args , * * kwargs )
return ' {schema} :// {password} : {user} @ {host} / {targets} / ' \
' ? {params} ' . format (
schema = self . secure_protocol ,
password = self . pprint (
self . password , privacy , mode = PrivacyMode . Secret , safe = ' ' ) ,
user = self . quote ( self . user , safe = ' ' ) ,
host = self . host ,
targets = ' / ' . join (
[ NotifyTwist . quote ( x , safe = ' ' ) for x in chain (
# Channels are prefixed with a pound/hashtag symbol
[ ' # {} ' . format ( x ) for x in self . channels ] ,
# Channel IDs
self . channel_ids ,
) ] ) ,
params = NotifyTwist . urlencode ( params ) ,
)
def login ( self ) :
"""
A simple wrapper to authenticate with the Twist Server
"""
# Prepare our payload
payload = {
' email ' : self . email ,
' password ' : self . password ,
}
# Reset our default workspace
self . default_workspace = None
# Reset our cached objects
self . _cached_workspaces = set ( )
self . _cached_channels = dict ( )
# Send Login Information
postokay , response = self . _fetch (
' users/login ' ,
payload = payload ,
# We set this boolean so internal recursion doesn't take place.
login = True ,
)
if not postokay or not response :
# Setting this variable to False as a way of letting us know
# we failed to authenticate on our last attempt
self . token = False
return False
# Our response object looks like this (content has been altered for
# presentation purposes):
# {
# "contact_info": null,
# "profession": null,
# "timezone": "UTC",
# "avatar_id": null,
# "id": 123456,
# "first_name": "Jordan",
# "comet_channel":
# "124371-34be423219130343030d4ec0a3dabbbbbe565eee",
# "restricted": false,
# "default_workspace": 92020,
# "snooze_dnd_end": null,
# "email": "user@example.com",
# "comet_server": "https://comet.twist.com",
# "snooze_until": null,
# "lang": "en",
# "feature_flags": [],
# "short_name": "Jordan P.",
# "away_mode": null,
# "time_format": "12",
# "client_id": "cb01f37e-a5b2-13e9-ba2a-023a33d10dc0",
# "removed": false,
# "emails": [
# {
# "connected": [],
# "email": "user@example.com",
# "primary": true
# }
# ],
# "scheduled_banners": [
# "threads_3",
# "threads_1",
# "notification_permissions",
# "search_1",
# "messages_1",
# "team_1",
# "inbox_2",
# "inbox_1"
# ],
# "snooze_dnd_start": null,
# "name": "Jordan Peterson",
# "off_days": [],
# "bot": false,
# "token": "2e82c1e4e8b0091fdaa34ff3972351821406f796",
# "snoozed": false,
# "setup_pending": false,
# "date_format": "MM/DD/YYYY"
# }
# Store our default workspace
self . default_workspace = response . get ( ' default_workspace ' )
# Acquire our token
self . token = response . get ( ' token ' )
self . logger . info ( ' Authenticated to Twist as {} ' . format ( self . email ) )
return True
def logout ( self ) :
"""
A simple wrapper to log out of the server
"""
if not self . token :
# Nothing more to do
return True
# Send Logout Message
postokay , response = self . _fetch ( ' users/logout ' )
# reset our token
self . token = None
# There is no need to handling failed log out attempts at this time
return True
def get_workspaces ( self ) :
"""
Returns all workspaces associated with this user account as a set
This returned object is either an empty dictionary or one that
looks like this :
{
' workspace ' : < workspace_id > ,
' workspace ' : < workspace_id > ,
' workspace ' : < workspace_id > ,
}
All workspaces are made lowercase for comparison purposes
"""
if not self . token and not self . login ( ) :
# Nothing more to do
return dict ( )
postokay , response = self . _fetch ( ' workspaces/get ' )
if not postokay or not response :
# We failed to retrieve
return dict ( )
# The response object looks like so:
# [
# {
# "created_ts": 1563044447,
# "name": "apprise",
# "creator": 123571,
# "color": 1,
# "default_channel": 13245,
# "plan": "free",
# "default_conversation": 63022,
# "id": 12345
# }
# ]
# Knowing our response, we can iterate over each object and cache our
# object
result = { }
for entry in response :
result [ entry . get ( ' name ' , ' ' ) . lower ( ) ] = entry . get ( ' id ' , ' ' )
return result
def get_channels ( self , wid ) :
"""
Simply returns the channel objects associated with the specified
workspace id .
This returned object is either an empty dictionary or one that
looks like this :
{
' channel1 ' : < channel_id > ,
' channel2 ' : < channel_id > ,
' channel3 ' : < channel_id > ,
}
All channels are made lowercase for comparison purposes
"""
if not self . token and not self . login ( ) :
# Nothing more to do
return { }
payload = { ' workspace_id ' : wid }
postokay , response = self . _fetch (
' channels/get ' , payload = payload )
if not postokay or not isinstance ( response , list ) :
# We failed to retrieve
return { }
# Response looks like this:
# [
# {
# "id": 123,
# "name": "General"
# "workspace_id": 12345,
# "color": 1,
# "description": "",
# "archived": false,
# "public": true,
# "user_ids": [
# 8754
# ],
# "created_ts": 1563044447,
# "creator": 123571,
# }
# ]
#
# Knowing our response, we can iterate over each object and cache our
# object
result = { }
for entry in response :
result [ entry . get ( ' name ' , ' ' ) . lower ( ) ] = entry . get ( ' id ' , ' ' )
return result
def _channel_migration ( self ) :
"""
A simple wrapper to get all of the current workspaces including
the default one . This plays a role in what channel ( s ) get notified
and where .
A cache lookup has overhead , and is only required to be preformed
if the user specified channels by their string value
"""
if not self . token and not self . login ( ) :
# Nothing more to do
return False
if not len ( self . channels ) :
# Nothing to do; take an early exit
return True
if self . default_workspace \
and self . default_workspace not in self . _cached_channels :
# Get our default workspace entries
self . _cached_channels [ self . default_workspace ] = \
self . get_channels ( self . default_workspace )
# initialize our error tracking
has_error = False
while len ( self . channels ) :
# Pop our channel off of the stack
result = IS_CHANNEL . match ( self . channels . pop ( ) )
# Populate our key variables
workspace = result . group ( ' workspace ' )
channel = result . group ( ' channel ' ) . lower ( )
# Acquire our workspace_id if we can
if workspace :
# We always work with the workspace in it's lowercase form
workspace = workspace . lower ( )
# A workspace was defined
if not len ( self . _cached_workspaces ) :
# cache our workspaces; this only needs to be done once
self . _cached_workspaces = self . get_workspaces ( )
if workspace not in self . _cached_workspaces :
# not found
self . logger . warning (
' The Twist User {} is not associated with the '
' Team {} ' . format ( self . email , workspace ) )
# Toggle our return flag
has_error = True
continue
# Store the workspace id
workspace_id = self . _cached_workspaces [ workspace ]
else :
# use default workspace
workspace_id = self . default_workspace
# Check to see if our channel exists in our default workspace
if workspace_id in self . _cached_channels \
and channel in self . _cached_channels [ workspace_id ] :
# Store our channel ID
self . channel_ids . add ( ' {} : {} ' . format (
workspace_id ,
self . _cached_channels [ workspace_id ] [ channel ] ,
) )
continue
# if we reach here, we failed to add our channel
self . logger . warning (
' The Channel # {} was not found {} . ' . format (
channel ,
' ' if not workspace
else ' with Team {} ' . format ( workspace ) ,
) )
# Toggle our return flag
has_error = True
continue
# There is no need to handling failed log out attempts at this time
return not has_error
def send ( self , body , title = ' ' , notify_type = NotifyType . INFO , * * kwargs ) :
"""
Perform Twist Notification
"""
# error tracking (used for function return)
has_error = False
if not self . token and not self . login ( ) :
# We failed to authenticate - we're done
return False
if len ( self . channels ) > 0 :
# Converts channels to their maped IDs if found; this is the only
# way to send notifications to Twist
self . _channel_migration ( )
if not len ( self . channel_ids ) :
# We have nothing to notify
self . logger . warning ( ' There are no Twist targets to notify ' )
return False
# Notify all of our identified channels
ids = list ( self . channel_ids )
while len ( ids ) > 0 :
# Retrieve our Channel Object
result = IS_CHANNEL_ID . match ( ids . pop ( ) )
# We need both the workspace/team id and channel id
channel_id = int ( result . group ( ' channel ' ) )
# Prepare our payload
payload = {
' channel_id ' : channel_id ,
' title ' : title ,
' content ' : body ,
}
postokay , response = self . _fetch (
' threads/add ' ,
payload = payload ,
)
# only toggle has_error flag if we had an error
if not postokay :
# Mark our failure
has_error = True
continue
# If we reach here, we were successful
self . logger . info (
' Sent Twist notification to {} . ' . format (
result . group ( ' name ' ) ) )
return not has_error
def _fetch ( self , url , payload = None , method = ' POST ' , login = False ) :
"""
Wrapper to Twist API requests object
"""
# use what was specified, otherwise build headers dynamically
headers = {
' User-Agent ' : self . app_id ,
}
headers [ ' Content-Type ' ] = \
' application/x-www-form-urlencoded; charset=utf-8 '
if self . token :
# Set our token
headers [ ' Authorization ' ] = ' Bearer {} ' . format ( self . token )
# Prepare our api url
api_url = ' {} {} ' . format ( self . api_url , url )
# Some Debug Logging
self . logger . debug ( ' Twist {} URL: {} (cert_verify= {} ) ' . format (
method , api_url , self . verify_certificate ) )
self . logger . debug ( ' Twist Payload: %s ' % str ( payload ) )
# Always call throttle before any remote server i/o is made;
self . throttle ( )
# Initialize a default value for our content value
content = { }
# acquire our request mode
fn = requests . post if method == ' POST ' else requests . get
try :
r = fn (
api_url ,
data = payload ,
headers = headers ,
verify = self . verify_certificate ,
timeout = self . request_timeout ,
)
# Get our JSON content if it's possible
try :
content = loads ( r . content )
except ( TypeError , ValueError , AttributeError ) :
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
content = { }
# handle authentication errors where our token has just simply
# expired. The error response content looks like this:
# {
# "error_code": 200,
# "error_uuid": "af80bd0715434231a649f2258d7fb946",
# "error_extra": {},
# "error_string": "Invalid token"
# }
#
# Authentication related codes:
# 120 = You are not logged in
# 200 = Invalid Token
#
# Source: https://developer.twist.com/v3/#errors
#
# We attempt to login again and retry the original request
# if we aren't in the process of handling a login already
if r . status_code != requests . codes . ok and login is False \
and isinstance ( content , dict ) and \
content . get ( ' error_code ' ) in ( 120 , 200 ) :
# We failed to authenticate with our token; login one more
# time and retry this original request
if self . login ( ) :
r = fn (
api_url ,
data = payload ,
headers = headers ,
verify = self . verify_certificate ,
timeout = self . request_timeout
)
# Get our JSON content if it's possible
try :
content = loads ( r . content )
except ( TypeError , ValueError , AttributeError ) :
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
content = { }
if r . status_code != requests . codes . ok :
# We had a problem
status_str = \
NotifyTwist . http_response_code_lookup ( r . status_code )
self . logger . warning (
' Failed to send Twist {} to {} : '
' {} error= {} . ' . format (
method ,
api_url ,
' , ' if status_str else ' ' ,
r . status_code ) )
self . logger . debug (
' Response Details: \r \n {} ' . format ( r . content ) )
# Mark our failure
return ( False , content )
except requests . RequestException as e :
self . logger . warning (
' Exception received when sending Twist {} to {} : ' .
format ( method , api_url ) )
self . logger . debug ( ' Socket Exception: %s ' % str ( e ) )
# Mark our failure
return ( False , content )
return ( True , content )
@staticmethod
def parse_url ( url ) :
"""
Parses the URL and returns enough arguments that can allow
us to re - instantiate this object .
"""
results = NotifyBase . parse_url ( url )
if not results :
# We're done early as we couldn't load the results
return results
if not results . get ( ' user ' ) :
# A username is required
return None
# Acquire our targets
results [ ' targets ' ] = NotifyTwist . split_path ( results [ ' fullpath ' ] )
if not results . get ( ' password ' ) :
# Password is required; we will accept the very first entry on the
# path as a password instead
if len ( results [ ' targets ' ] ) == 0 :
# No targets to get our password from
return None
# We need to requote contents since this variable will get
# unquoted later on in the process. This step appears a bit
# hacky, but it allows us to support the password in this location
# - twist://user@example.com/password
results [ ' password ' ] = NotifyTwist . quote (
results [ ' targets ' ] . pop ( 0 ) , safe = ' ' )
else :
# Now we handle our format:
# twist://password:email
#
# since URL logic expects
# schema://user:password@host
#
# you can see how this breaks. The colon at the front delmits
# passwords and you can see the twist:// url inverts what we
# expect:
# twist://password:user@example.com
#
# twist://abc123:bob@example.com using normal conventions would
# have interpreted 'bob' as the password and 'abc123' as the user.
# For the purpose of apprise simplifying this for us, we need to
# swap these arguments when we prepare the email.
_password = results [ ' user ' ]
results [ ' user ' ] = results [ ' password ' ]
results [ ' password ' ] = _password
# The 'to' makes it easier to use yaml configuration
if ' to ' in results [ ' qsd ' ] and len ( results [ ' qsd ' ] [ ' to ' ] ) :
results [ ' targets ' ] + = \
NotifyTwist . parse_list ( results [ ' qsd ' ] [ ' to ' ] )
return results
def __del__ ( self ) :
"""
Deconstructor
"""
try :
self . logout ( )
except LookupError : # pragma: no cover
# Python v3.5 call to requests can sometimes throw the exception
# "/usr/lib64/python3.7/socket.py", line 748, in getaddrinfo
# LookupError: unknown encoding: idna
#
# This occurs every time when running unit-tests against Apprise:
# LANG=C.UTF-8 PYTHONPATH=$(pwd) py.test-3.7
#
# There has been an open issue on this since Jan 2017.
# - https://bugs.python.org/issue29288
#
# A ~similar~ issue can be identified here in the requests
# ticket system as unresolved and has provided work-arounds
# - https://github.com/kennethreitz/requests/issues/3578
pass
except ImportError : # pragma: no cover
# The actual exception is `ModuleNotFoundError` however ImportError
# grants us backwards compatiblity with versions of Python older
# than v3.6
# Python code that makes early calls to sys.exit() can cause
# the __del__() code to run. However in some newer versions of
# Python, this causes the `sys` library to no longer be
# available. The stack overflow also goes on to suggest that
# it's not wise to use the __del__() as a deconstructor
# which is the case here.
# https://stackoverflow.com/questions/67218341/\
# modulenotfounderror-import-of-time-halted-none-in-sys-\
# modules-occured-when-obj?noredirect=1&lq=1
#
#
# Also see: https://stackoverflow.com/questions\
# /1481488/what-is-the-del-method-and-how-do-i-call-it
# At this time it seems clean to try to log out (if we can)
# but not throw any unessisary exceptions (like this one) to
# the end user if we don't have to.
pass