Merge branch 'development' into morpheus

pull/543/head
Louis Vézina 5 years ago
commit 4b8f64a64d

@ -0,0 +1,474 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this service you will need a D7 Networks account from their website
# at https://d7networks.com/
#
# After you've established your account you can get your api login credentials
# (both user and password) from the API Details section from within your
# account profile area: https://d7networks.com/accounts/profile/
import re
import six
import requests
import base64
from json import dumps
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
# Extend HTTP Error Messages
D7NETWORKS_HTTP_ERROR_MAP = {
401: 'Invalid Argument(s) Specified.',
403: 'Unauthorized - Authentication Failure.',
412: 'A Routing Error Occured',
500: 'A Serverside Error Occured Handling the Request.',
}
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
# Priorities
class D7SMSPriority(object):
"""
D7 Networks SMS Message Priority
"""
LOW = 0
MODERATE = 1
NORMAL = 2
HIGH = 3
D7NETWORK_SMS_PRIORITIES = (
D7SMSPriority.LOW,
D7SMSPriority.MODERATE,
D7SMSPriority.NORMAL,
D7SMSPriority.HIGH,
)
class NotifyD7Networks(NotifyBase):
"""
A wrapper for D7 Networks Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'D7 Networks'
# The services URL
service_url = 'https://d7networks.com/'
# All pushover requests are secure
secure_protocol = 'd7sms'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twilio'
# D7 Networks batch notification URL
notify_batch_url = 'http://rest-api.d7networks.com/secure/sendbatch'
# D7 Networks single notification URL
notify_url = 'http://rest-api.d7networks.com/secure/send'
# The maximum length of the body
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{user}:{password}@{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'user': {
'name': _('Username'),
'type': 'string',
'required': True,
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
'required': True,
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'[0-9\s)(+-]+', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'priority': {
'name': _('Priority'),
'type': 'choice:int',
'min': D7SMSPriority.LOW,
'max': D7SMSPriority.HIGH,
'values': D7NETWORK_SMS_PRIORITIES,
# The website identifies that the default priority is low; so
# this plugin will honor that same default
'default': D7SMSPriority.LOW,
},
'batch': {
'name': _('Batch Mode'),
'type': 'bool',
'default': False,
},
'to': {
'alias_of': 'targets',
},
'source': {
# Originating address,In cases where the rewriting of the sender's
# address is supported or permitted by the SMS-C. This is used to
# transmit the message, this number is transmitted as the
# originating address and is completely optional.
'name': _('Originating Address'),
'type': 'string',
'map_to': 'source',
},
'from': {
'alias_of': 'source',
},
})
def __init__(self, targets=None, priority=None, source=None, batch=False,
**kwargs):
"""
Initialize D7 Networks Object
"""
super(NotifyD7Networks, self).__init__(**kwargs)
# The Priority of the message
if priority not in D7NETWORK_SMS_PRIORITIES:
self.priority = self.template_args['priority']['default']
else:
self.priority = priority
# Prepare Batch Mode Flag
self.batch = batch
# Setup our source address (if defined)
self.source = None \
if not isinstance(source, six.string_types) else source.strip()
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
# if it's less than 10, then we can assume it's
# a poorly specified phone no and spit a warning
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # ({}) specified.'.format(target))
if len(self.targets) == 0:
msg = 'There are no valid targets identified to notify.'
self.logger.warning(msg)
raise TypeError(msg)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Depending on whether we are set to batch mode or single mode this
redirects to the appropriate handling
"""
# error tracking (used for function return)
has_error = False
auth = '{user}:{password}'.format(
user=self.user, password=self.password)
if six.PY3:
# Python 3's versio of b64encode() expects a byte array and not
# a string. To accomodate this, we encode the content here
auth = auth.encode('utf-8')
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Accept': 'application/json',
'Authorization': 'Basic {}'.format(base64.b64encode(auth))
}
# Our URL varies depending if we're doing a batch mode or not
url = self.notify_batch_url if self.batch else self.notify_url
# use the list directly
targets = list(self.targets)
while len(targets):
if self.batch:
# Prepare our payload
payload = {
'globals': {
'priority': self.priority,
'from': self.source if self.source else self.app_id,
},
'messages': [{
'to': self.targets,
'content': body,
}],
}
# Reset our targets so we don't keep going. This is required
# because we're in batch mode; we only need to loop once.
targets = []
else:
# We're not in a batch mode; so get our next target
# Get our target(s) to notify
target = targets.pop(0)
# Prepare our payload
payload = {
'priority': self.priority,
'content': body,
'to': target,
'from': self.source if self.source else self.app_id,
}
# Some Debug Logging
self.logger.debug(
'D7 Networks POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('D7 Networks Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code not in (
requests.codes.created, requests.codes.ok):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code, D7NETWORKS_HTTP_ERROR_MAP)
try:
# Update our status response if we can
json_response = loads(r.content)
status_str = json_response.get('message', status_str)
except (AttributeError, ValueError):
# could not parse JSON response... just use the status
# we already have.
# AttributeError means r.content was None
pass
self.logger.warning(
'Failed to send D7 Networks SMS notification to {}: '
'{}{}error={}.'.format(
', '.join(target) if self.batch else target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
if self.batch:
count = len(self.targets)
try:
# Get our message delivery count if we can
json_response = loads(r.content)
count = int(json_response.get(
'data', {}).get('messageCount', -1))
except (AttributeError, ValueError, TypeError):
# could not parse JSON response... just assume
# that our delivery is okay for now
pass
if count != len(self.targets):
has_error = True
self.logger.info(
'Sent D7 Networks batch SMS notification to '
'{} of {} target(s).'.format(
count, len(self.targets)))
else:
self.logger.info(
'Sent D7 Networks SMS notification to {}.'.format(
target))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending D7 Networks:%s ' % (
', '.join(self.targets)) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'batch': 'yes' if self.batch else 'no',
}
if self.priority != self.template_args['priority']['default']:
args['priority'] = str(self.priority)
if self.source:
args['from'] = self.source
return '{schema}://{user}:{password}@{targets}/?{args}'.format(
schema=self.secure_protocol,
user=NotifyD7Networks.quote(self.user, safe=''),
password=NotifyD7Networks.quote(self.password, safe=''),
targets='/'.join(
[NotifyD7Networks.quote(x, safe='') for x in self.targets]),
args=NotifyD7Networks.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Initialize our targets
results['targets'] = list()
# The store our first target stored in the hostname
results['targets'].append(NotifyD7Networks.unquote(results['host']))
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'].extend(
NotifyD7Networks.split_path(results['fullpath']))
# Set our priority
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': D7SMSPriority.LOW,
'0': D7SMSPriority.LOW,
'm': D7SMSPriority.MODERATE,
'1': D7SMSPriority.MODERATE,
'n': D7SMSPriority.NORMAL,
'2': D7SMSPriority.NORMAL,
'h': D7SMSPriority.HIGH,
'3': D7SMSPriority.HIGH,
}
try:
results['priority'] = \
_map[results['qsd']['priority'][0].lower()]
except KeyError:
# No priority was set
pass
# Support the 'from' and 'source' variable so that we can support
# targets this way too.
# The 'from' makes it easier to use yaml configuration
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['source'] = \
NotifyD7Networks.unquote(results['qsd']['from'])
if 'source' in results['qsd'] and len(results['qsd']['source']):
results['source'] = \
NotifyD7Networks.unquote(results['qsd']['source'])
# Get Batch Mode Flag
results['batch'] = \
parse_bool(results['qsd'].get('batch', False))
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyD7Networks.parse_list(results['qsd']['to'])
return results

@ -0,0 +1,416 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Sign-up with https://dashboard.nexmo.com/
#
# Get your (api) key and secret here:
# - https://dashboard.nexmo.com/getting-started-guide
#
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..AppriseLocale import gettext_lazy as _
# Token required as part of the API request
VALIDATE_APIKEY = re.compile(r'^[a-z0-9]{8}$', re.I)
VALIDATE_SECRET = re.compile(r'^[a-z0-9]{16}$', re.I)
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class NotifyNexmo(NotifyBase):
"""
A wrapper for Nexmo Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Nexmo'
# The services URL
service_url = 'https://dashboard.nexmo.com/'
# The default protocol
secure_protocol = 'nexmo'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_nexmo'
# Nexmo uses the http protocol with JSON requests
notify_url = 'https://rest.nexmo.com/sms/json'
# The maximum length of the body
body_maxlen = 140
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Default Time To Live
# By default Nexmo attempt delivery for 72 hours, however the maximum
# effective value depends on the operator and is typically 24 - 48 hours.
# We recommend this value should be kept at its default or at least 30
# minutes.
default_ttl = 900000
ttl_max = 604800000
ttl_min = 20000
# Define object templates
templates = (
'{schema}://{apikey}:{secret}@{from_phone}',
'{schema}://{apikey}:{secret}@{from_phone}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'required': True,
'regex': (r'AC[a-z0-9]{8}', 'i'),
},
'secret': {
'name': _('API Secret'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'[a-z0-9]{16}', 'i'),
},
'from_phone': {
'name': _('From Phone No'),
'type': 'string',
'required': True,
'regex': (r'\+?[0-9\s)(+-]+', 'i'),
'map_to': 'source',
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'[0-9\s)(+-]+', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'from_phone',
},
'key': {
'alias_of': 'apikey',
},
'secret': {
'alias_of': 'secret',
},
'ttl': {
'name': _('ttl'),
'type': 'int',
'default': 900000,
'min': 20000,
'max': 604800000,
},
})
def __init__(self, apikey, secret, source, targets=None, ttl=None,
**kwargs):
"""
Initialize Nexmo Object
"""
super(NotifyNexmo, self).__init__(**kwargs)
try:
# The Account SID associated with the account
self.apikey = apikey.strip()
except AttributeError:
# Token was None
msg = 'No Nexmo APIKey was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not VALIDATE_APIKEY.match(self.apikey):
msg = 'The Nexmo API Key specified ({}) is invalid.'\
.format(self.apikey)
self.logger.warning(msg)
raise TypeError(msg)
try:
# The Account SID associated with the account
self.secret = secret.strip()
except AttributeError:
# Token was None
msg = 'No Nexmo API Secret was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not VALIDATE_SECRET.match(self.secret):
msg = 'The Nexmo API Secret specified ({}) is invalid.'\
.format(self.secret)
self.logger.warning(msg)
raise TypeError(msg)
# Set our Time to Live Flag
self.ttl = self.default_ttl
try:
self.ttl = int(ttl)
except (ValueError, TypeError):
# Do nothing
pass
if self.ttl < self.ttl_min or self.ttl > self.ttl_max:
msg = 'The Nexmo TTL specified ({}) is out of range.'\
.format(self.ttl)
self.logger.warning(msg)
raise TypeError(msg)
# The Source Phone #
self.source = source
if not IS_PHONE_NO.match(self.source):
msg = 'The Account (From) Phone # specified ' \
'({}) is invalid.'.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Tidy source
self.source = re.sub(r'[^\d]+', '', self.source)
if len(self.source) < 11 or len(self.source) > 14:
msg = 'The Account (From) Phone # specified ' \
'({}) contains an invalid digit count.'.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Nexmo Notification
"""
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
}
# Prepare our payload
payload = {
'api_key': self.apikey,
'api_secret': self.secret,
'ttl': self.ttl,
'from': self.source,
'text': body,
# The to gets populated in the loop below
'to': None,
}
# Create a copy of the targets list
targets = list(self.targets)
if len(targets) == 0:
# No sources specified, use our own phone no
targets.append(self.source)
while len(targets):
# Get our target to notify
target = targets.pop(0)
# Prepare our user
payload['to'] = target
# Some Debug Logging
self.logger.debug('Nexmo POST URL: {} (cert_verify={})'.format(
self.notify_url, self.verify_certificate))
self.logger.debug('Nexmo Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyNexmo.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send Nexmo notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info('Sent Nexmo notification to %s.' % target)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Nexmo:%s '
'notification.' % target
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'ttl': str(self.ttl),
}
return '{schema}://{key}:{secret}@{source}/{targets}/?{args}'.format(
schema=self.secure_protocol,
key=self.apikey,
secret=self.secret,
source=NotifyNexmo.quote(self.source, safe=''),
targets='/'.join(
[NotifyNexmo.quote(x, safe='') for x in self.targets]),
args=NotifyNexmo.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifyNexmo.split_path(results['fullpath'])
# The hostname is our source number
results['source'] = NotifyNexmo.unquote(results['host'])
# Get our account_side and auth_token from the user/pass config
results['apikey'] = NotifyNexmo.unquote(results['user'])
results['secret'] = NotifyNexmo.unquote(results['password'])
# API Key
if 'key' in results['qsd'] and len(results['qsd']['key']):
# Extract the API Key from an argument
results['apikey'] = \
NotifyNexmo.unquote(results['qsd']['key'])
# API Secret
if 'secret' in results['qsd'] and len(results['qsd']['secret']):
# Extract the API Secret from an argument
results['secret'] = \
NotifyNexmo.unquote(results['qsd']['secret'])
# Support the 'from' and 'source' variable so that we can support
# targets this way too.
# The 'from' makes it easier to use yaml configuration
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['source'] = \
NotifyNexmo.unquote(results['qsd']['from'])
if 'source' in results['qsd'] and len(results['qsd']['source']):
results['source'] = \
NotifyNexmo.unquote(results['qsd']['source'])
# Support the 'ttl' variable
if 'ttl' in results['qsd'] and len(results['qsd']['ttl']):
results['ttl'] = \
NotifyNexmo.unquote(results['qsd']['ttl'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyNexmo.parse_list(results['qsd']['to'])
return results

@ -0,0 +1,225 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this plugin, you need to download the app
# - Apple: https://itunes.apple.com/us/app/\
# push-by-techulus/id1444391917?ls=1&mt=8
# - Android: https://play.google.com/store/apps/\
# details?id=com.techulus.push
#
# You have to sign up through the account via your mobile device.
#
# Once you've got your account, you can get your API key from here:
# https://push.techulus.com/login.html
#
# You can also just get the {apikey} right out of the phone app that is
# installed.
#
# your {apikey} will look something like:
# b444a40f-3db9-4224-b489-9a514c41c009
#
# You will need to assemble all of your URLs for this plugin to work as:
# push://{apikey}
#
# Resources
# - https://push.techulus.com/ - Main Website
# - https://pushtechulus.docs.apiary.io - API Documentation
import re
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..AppriseLocale import gettext_lazy as _
# Token required as part of the API request
# Used to prepare our UUID regex matching
UUID4_RE = \
r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
# API Key
VALIDATE_APIKEY = re.compile(UUID4_RE, re.I)
class NotifyTechulusPush(NotifyBase):
"""
A wrapper for Techulus Push Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Techulus Push'
# The services URL
service_url = 'https://push.techulus.com'
# The default secure protocol
secure_protocol = 'push'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_techulus'
# Techulus Push uses the http protocol with JSON requests
notify_url = 'https://push.techulus.com/api/v1/notify'
# The maximum allowable characters allowed in the body per message
body_maxlen = 1000
# Define object templates
templates = (
'{schema}://{apikey}',
)
# Define our template apikeys
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
'regex': (UUID4_RE, 'i'),
},
})
def __init__(self, apikey, **kwargs):
"""
Initialize Techulus Push Object
"""
super(NotifyTechulusPush, self).__init__(**kwargs)
if not apikey:
msg = 'The Techulus Push apikey is not specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not VALIDATE_APIKEY.match(apikey.strip()):
msg = 'The Techulus Push apikey specified ({}) is invalid.'\
.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# The apikey associated with the account
self.apikey = apikey.strip()
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Techulus Push Notification
"""
# Setup our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
'x-api-key': self.apikey,
}
payload = {
'title': title,
'body': body,
}
self.logger.debug('Techulus Push POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('Techulus Push Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyTechulusPush.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send Techulus Push notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
else:
self.logger.info(
'Sent Techulus Push notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Techulus Push '
'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{apikey}/?{args}'.format(
schema=self.secure_protocol,
apikey=NotifyTechulusPush.quote(self.apikey, safe=''),
args=NotifyTechulusPush.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The first apikey is stored in the hostname
results['apikey'] = NotifyTechulusPush.unquote(results['host'])
return results

@ -0,0 +1,805 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# All of the documentation needed to work with the Twist API can be found
# here: https://developer.twist.com/v3/
import re
import requests
from json import loads
from itertools import chain
from .NotifyBase import NotifyBase
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..AppriseLocale import gettext_lazy as _
# A workspace can also be interpreted as a team name too!
IS_CHANNEL = re.compile(
r'^#?(?P<name>((?P<workspace>[A-Za-z0-9_-]+):)?'
r'(?P<channel>[^\s]{1,64}))$')
IS_CHANNEL_ID = re.compile(
r'^(?P<name>((?P<workspace>[0-9]+):)?(?P<channel>[0-9]+))$')
# Used to break apart list of potential tags by their delimiter
# into a usable list.
LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
class NotifyTwist(NotifyBase):
"""
A wrapper for Notify Twist Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twist'
# The services URL
service_url = 'https://twist.com'
# The default secure protocol
secure_protocol = 'twist'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twist'
# The maximum size of the message
body_maxlen = 1000
# Default to markdown
notify_format = NotifyFormat.MARKDOWN
# The default Notification URL to use
api_url = 'https://api.twist.com/api/v3/'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.2
# The default channel to notify if no targets are specified
default_notification_channel = 'general'
# Define object templates
templates = (
'{schema}://{password}:{email}',
'{schema}://{password}:{email}/{targets}',
)
# Define our template arguments
template_tokens = dict(NotifyBase.template_tokens, **{
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
'email': {
'name': _('Email'),
'type': 'string',
},
'target_channel': {
'name': _('Target Channel'),
'type': 'string',
'prefix': '#',
'map_to': 'targets',
},
'target_channel_id': {
'name': _('Target Channel ID'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
def __init__(self, email=None, targets=None, **kwargs):
"""
Initialize Notify Twist Object
"""
super(NotifyTwist, self).__init__(**kwargs)
# Initialize channels list
self.channels = set()
# Initialize Channel ID which are stored as:
# <workspace_id>:<channel_id>
self.channel_ids = set()
# Initialize our Email Object
self.email = email if email else '{}@{}'.format(
self.user,
self.host,
)
# The token is None if we're not logged in and False if we
# failed to log in. Otherwise it is set to the actual token
self.token = None
# Our default workspace (associated with our token)
self.default_workspace = None
# A set of all of the available workspaces
self._cached_workspaces = set()
# A mapping of channel names, the layout is as follows:
# {
# <workspace_id>: {
# <channel_name>: <channel_id>,
# <channel_name>: <channel_id>,
# ...
# },
# <workspace2_id>: {
# <channel_name>: <channel_id>,
# <channel_name>: <channel_id>,
# ...
# },
# }
self._cached_channels = dict()
try:
result = GET_EMAIL_RE.match(self.email)
if not result:
# let outer exception handle this
raise TypeError
if email:
# Force user/host to be that of the defined email for
# consistency. This is very important for those initializing
# this object with the the email object would could potentially
# cause inconsistency to contents in the NotifyBase() object
self.user = result.group('fulluser')
self.host = result.group('domain')
except (TypeError, AttributeError):
msg = 'The Twist Auth email specified ({}) is invalid.'\
.format(self.email)
self.logger.warning(msg)
raise TypeError(msg)
if not self.password:
msg = 'No Twist password was specified with account: {}'\
.format(self.email)
self.logger.warning(msg)
raise TypeError(msg)
# Validate recipients and drop bad ones:
for recipient in parse_list(targets):
result = IS_CHANNEL_ID.match(recipient)
if result:
# store valid channel id
self.channel_ids.add(result.group('name'))
continue
result = IS_CHANNEL.match(recipient)
if result:
# store valid device
self.channels.add(result.group('name').lower())
continue
self.logger.warning(
'Dropped invalid channel/id '
'({}) specified.'.format(recipient),
)
if len(self.channels) + len(self.channel_ids) == 0:
# Notify our default channel
self.channels.add(self.default_notification_channel)
self.logger.warning(
'Added default notification channel {}'.format(
self.default_notification_channel))
return
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{password}:{user}@{host}/{targets}/?{args}'.format(
schema=self.secure_protocol,
password=self.quote(self.password, safe=''),
user=self.quote(self.user, safe=''),
host=self.host,
targets='/'.join(
[NotifyTwist.quote(x, safe='') for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Channel IDs
self.channel_ids,
)]),
args=NotifyTwist.urlencode(args),
)
def login(self):
"""
A simple wrapper to authenticate with the Twist Server
"""
# Prepare our payload
payload = {
'email': self.email,
'password': self.password,
}
# Reset our default workspace
self.default_workspace = None
# Reset our cached objects
self._cached_workspaces = set()
self._cached_channels = dict()
# Send Login Information
postokay, response = self._fetch(
'users/login',
payload=payload,
# We set this boolean so internal recursion doesn't take place.
login=True,
)
if not postokay or not response:
# Setting this variable to False as a way of letting us know
# we failed to authenticate on our last attempt
self.token = False
return False
# Our response object looks like this (content has been altered for
# presentation purposes):
# {
# "contact_info": null,
# "profession": null,
# "timezone": "UTC",
# "avatar_id": null,
# "id": 123456,
# "first_name": "Jordan",
# "comet_channel":
# "124371-34be423219130343030d4ec0a3dabbbbbe565eee",
# "restricted": false,
# "default_workspace": 92020,
# "snooze_dnd_end": null,
# "email": "user@example.com",
# "comet_server": "https://comet.twist.com",
# "snooze_until": null,
# "lang": "en",
# "feature_flags": [],
# "short_name": "Jordan P.",
# "away_mode": null,
# "time_format": "12",
# "client_id": "cb01f37e-a5b2-13e9-ba2a-023a33d10dc0",
# "removed": false,
# "emails": [
# {
# "connected": [],
# "email": "user@example.com",
# "primary": true
# }
# ],
# "scheduled_banners": [
# "threads_3",
# "threads_1",
# "notification_permissions",
# "search_1",
# "messages_1",
# "team_1",
# "inbox_2",
# "inbox_1"
# ],
# "snooze_dnd_start": null,
# "name": "Jordan Peterson",
# "off_days": [],
# "bot": false,
# "token": "2e82c1e4e8b0091fdaa34ff3972351821406f796",
# "snoozed": false,
# "setup_pending": false,
# "date_format": "MM/DD/YYYY"
# }
# Store our default workspace
self.default_workspace = response.get('default_workspace')
# Acquire our token
self.token = response.get('token')
self.logger.info('Authenticated to Twist as {}'.format(self.email))
return True
def logout(self):
"""
A simple wrapper to log out of the server
"""
if not self.token:
# Nothing more to do
return True
# Send Logout Message
postokay, response = self._fetch('users/logout')
# reset our token
self.token = None
# There is no need to handling failed log out attempts at this time
return True
def get_workspaces(self):
"""
Returns all workspaces associated with this user account as a set
This returned object is either an empty dictionary or one that
looks like this:
{
'workspace': <workspace_id>,
'workspace': <workspace_id>,
'workspace': <workspace_id>,
}
All workspaces are made lowercase for comparison purposes
"""
if not self.token and not self.login():
# Nothing more to do
return dict()
postokay, response = self._fetch('workspaces/get')
if not postokay or not response:
# We failed to retrieve
return dict()
# The response object looks like so:
# [
# {
# "created_ts": 1563044447,
# "name": "apprise",
# "creator": 123571,
# "color": 1,
# "default_channel": 13245,
# "plan": "free",
# "default_conversation": 63022,
# "id": 12345
# }
# ]
# Knowing our response, we can iterate over each object and cache our
# object
result = {}
for entry in response:
result[entry.get('name', '').lower()] = entry.get('id', '')
return result
def get_channels(self, wid):
"""
Simply returns the channel objects associated with the specified
workspace id.
This returned object is either an empty dictionary or one that
looks like this:
{
'channel1': <channel_id>,
'channel2': <channel_id>,
'channel3': <channel_id>,
}
All channels are made lowercase for comparison purposes
"""
if not self.token and not self.login():
# Nothing more to do
return {}
payload = {'workspace_id': wid}
postokay, response = self._fetch(
'channels/get', payload=payload)
if not postokay or not isinstance(response, list):
# We failed to retrieve
return {}
# Response looks like this:
# [
# {
# "id": 123,
# "name": "General"
# "workspace_id": 12345,
# "color": 1,
# "description": "",
# "archived": false,
# "public": true,
# "user_ids": [
# 8754
# ],
# "created_ts": 1563044447,
# "creator": 123571,
# }
# ]
#
# Knowing our response, we can iterate over each object and cache our
# object
result = {}
for entry in response:
result[entry.get('name', '').lower()] = entry.get('id', '')
return result
def _channel_migration(self):
"""
A simple wrapper to get all of the current workspaces including
the default one. This plays a role in what channel(s) get notified
and where.
A cache lookup has overhead, and is only required to be preformed
if the user specified channels by their string value
"""
if not self.token and not self.login():
# Nothing more to do
return False
if not len(self.channels):
# Nothing to do; take an early exit
return True
if self.default_workspace \
and self.default_workspace not in self._cached_channels:
# Get our default workspace entries
self._cached_channels[self.default_workspace] = \
self.get_channels(self.default_workspace)
# initialize our error tracking
has_error = False
while len(self.channels):
# Pop our channel off of the stack
result = IS_CHANNEL.match(self.channels.pop())
# Populate our key variables
workspace = result.group('workspace')
channel = result.group('channel').lower()
# Acquire our workspace_id if we can
if workspace:
# We always work with the workspace in it's lowercase form
workspace = workspace.lower()
# A workspace was defined
if not len(self._cached_workspaces):
# cache our workspaces; this only needs to be done once
self._cached_workspaces = self.get_workspaces()
if workspace not in self._cached_workspaces:
# not found
self.logger.warning(
'The Twist User {} is not associated with the '
'Team {}'.format(self.email, workspace))
# Toggle our return flag
has_error = True
continue
# Store the workspace id
workspace_id = self._cached_workspaces[workspace]
else:
# use default workspace
workspace_id = self.default_workspace
# Check to see if our channel exists in our default workspace
if workspace_id in self._cached_channels \
and channel in self._cached_channels[workspace_id]:
# Store our channel ID
self.channel_ids.add('{}:{}'.format(
workspace_id,
self._cached_channels[workspace_id][channel],
))
continue
# if we reach here, we failed to add our channel
self.logger.warning(
'The Channel #{} was not found{}.'.format(
channel,
'' if not workspace
else ' with Team {}'.format(workspace),
))
# Toggle our return flag
has_error = True
continue
# There is no need to handling failed log out attempts at this time
return not has_error
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twist Notification
"""
# error tracking (used for function return)
has_error = False
if not self.token and not self.login():
# We failed to authenticate - we're done
return False
if len(self.channels) > 0:
# Converts channels to their maped IDs if found; this is the only
# way to send notifications to Twist
self._channel_migration()
if not len(self.channel_ids):
# We have nothing to notify
return False
# Notify all of our identified channels
ids = list(self.channel_ids)
while len(ids) > 0:
# Retrieve our Channel Object
result = IS_CHANNEL_ID.match(ids.pop())
# We need both the workspace/team id and channel id
channel_id = int(result.group('channel'))
# Prepare our payload
payload = {
'channel_id': channel_id,
'title': title,
'content': body,
}
postokay, response = self._fetch(
'threads/add',
payload=payload,
)
# only toggle has_error flag if we had an error
if not postokay:
# Mark our failure
has_error = True
continue
# If we reach here, we were successful
self.logger.info(
'Sent Twist notification to {}.'.format(
result.group('name')))
return not has_error
def _fetch(self, url, payload=None, method='POST', login=False):
"""
Wrapper to Twist API requests object
"""
# use what was specified, otherwise build headers dynamically
headers = {
'User-Agent': self.app_id,
}
headers['Content-Type'] = \
'application/x-www-form-urlencoded; charset=utf-8'
if self.token:
# Set our token
headers['Authorization'] = 'Bearer {}'.format(self.token)
# Prepare our api url
api_url = '{}{}'.format(self.api_url, url)
# Some Debug Logging
self.logger.debug('Twist {} URL: {} (cert_verify={})'.format(
method, api_url, self.verify_certificate))
self.logger.debug('Twist Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made;
self.throttle()
# Initialize a default value for our content value
content = {}
# acquire our request mode
fn = requests.post if method == 'POST' else requests.get
try:
r = fn(
api_url,
data=payload,
headers=headers,
verify=self.verify_certificate)
# Get our JSON content if it's possible
try:
content = loads(r.content)
except (TypeError, ValueError, AttributeError):
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
content = {}
# handle authentication errors where our token has just simply
# expired. The error response content looks like this:
# {
# "error_code": 200,
# "error_uuid": "af80bd0715434231a649f2258d7fb946",
# "error_extra": {},
# "error_string": "Invalid token"
# }
#
# Authentication related codes:
# 120 = You are not logged in
# 200 = Invalid Token
#
# Source: https://developer.twist.com/v3/#errors
#
# We attempt to login again and retry the original request
# if we aren't in the process of handling a login already
if r.status_code != requests.codes.ok and login is False \
and isinstance(content, dict) and \
content.get('error_code') in (120, 200):
# We failed to authenticate with our token; login one more
# time and retry this original request
if self.login():
r = fn(
api_url,
data=payload,
headers=headers,
verify=self.verify_certificate)
# Get our JSON content if it's possible
try:
content = loads(r.content)
except (TypeError, ValueError, AttributeError):
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
content = {}
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyTwist.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Twist {} to {}: '
'{}error={}.'.format(
method,
api_url,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
return (False, content)
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending Twist {} to {}: '.
format(method, api_url))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
return (False, content)
return (True, content)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
if not results.get('user'):
# A username is required
return None
# Acquire our targets
results['targets'] = NotifyTwist.split_path(results['fullpath'])
if not results.get('password'):
# Password is required; we will accept the very first entry on the
# path as a password instead
if len(results['targets']) == 0:
# No targets to get our password from
return None
# We need to requote contents since this variable will get
# unquoted later on in the process. This step appears a bit
# hacky, but it allows us to support the password in this location
# - twist://user@example.com/password
results['password'] = NotifyTwist.quote(
results['targets'].pop(0), safe='')
else:
# Now we handle our format:
# twist://password:email
#
# since URL logic expects
# schema://user:password@host
#
# you can see how this breaks. The colon at the front delmits
# passwords and you can see the twist:// url inverts what we
# expect:
# twist://password:user@example.com
#
# twist://abc123:bob@example.com using normal conventions would
# have interpreted 'bob' as the password and 'abc123' as the user.
# For the purpose of apprise simplifying this for us, we need to
# swap these arguments when we prepare the email.
_password = results['user']
results['user'] = results['password']
results['password'] = _password
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyTwist.parse_list(results['qsd']['to'])
return results
def __del__(self):
"""
Deconstructor
"""
try:
self.logout()
except LookupError:
# Python v3.5 call to requests can sometimes throw the exception
# "/usr/lib64/python3.7/socket.py", line 748, in getaddrinfo
# LookupError: unknown encoding: idna
#
# This occurs every time when running unit-tests against Apprise:
# LANG=C.UTF-8 PYTHONPATH=$(pwd) py.test-3.7
#
# There has been an open issue on this since Jan 2017.
# - https://bugs.python.org/issue29288
#
# A ~similar~ issue can be identified here in the requests
# ticket system as unresolved and has provided work-arounds
# - https://github.com/kennethreitz/requests/issues/3578
pass

@ -0,0 +1,654 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# See https://developer.twitter.com/en/docs/direct-messages/\
# sending-and-receiving/api-reference/new-event.html
import re
import six
import requests
from datetime import datetime
from requests_oauthlib import OAuth1
from json import dumps
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
IS_USER = re.compile(r'^\s*@?(?P<user>[A-Z0-9_]+)$', re.I)
class TwitterMessageMode(object):
"""
Twitter Message Mode
"""
# DM (a Direct Message)
DM = 'dm'
# A Public Tweet
TWEET = 'tweet'
# Define the types in a list for validation purposes
TWITTER_MESSAGE_MODES = (
TwitterMessageMode.DM,
TwitterMessageMode.TWEET,
)
class NotifyTwitter(NotifyBase):
"""
A wrapper to Twitter Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twitter'
# The services URL
service_url = 'https://twitter.com/'
# The default secure protocol is twitter. 'tweet' is left behind
# for backwards compatibility of older apprise usage
secure_protocol = ('twitter', 'tweet')
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twitter'
# Do not set body_maxlen as it is set in a property value below
# since the length varies depending if we are doing a direct message
# or a tweet
# body_maxlen = see below @propery defined
# Twitter does have titles when creating a message
title_maxlen = 0
# Twitter API
twitter_api = 'api.twitter.com'
# Twitter API Reference To Acquire Someone's Twitter ID
twitter_lookup = 'https://api.twitter.com/1.1/users/lookup.json'
# Twitter API Reference To Acquire Current Users Information
twitter_whoami = \
'https://api.twitter.com/1.1/account/verify_credentials.json'
# Twitter API Reference To Send A Private DM
twitter_dm = 'https://api.twitter.com/1.1/direct_messages/events/new.json'
# Twitter API Reference To Send A Public Tweet
twitter_tweet = 'https://api.twitter.com/1.1/statuses/update.json'
# Twitter is kind enough to return how many more requests we're allowed to
# continue to make within it's header response as:
# X-Rate-Limit-Reset: The epoc time (in seconds) we can expect our
# rate-limit to be reset.
# X-Rate-Limit-Remaining: an integer identifying how many requests we're
# still allow to make.
request_rate_per_sec = 0
# For Tracking Purposes
ratelimit_reset = datetime.utcnow()
# Default to 1000; users can send up to 1000 DM's and 2400 tweets a day
# This value only get's adjusted if the server sets it that way
ratelimit_remaining = 1
templates = (
'{schema}://{ckey}/{csecret}/{akey}/{asecret}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'ckey': {
'name': _('Consumer Key'),
'type': 'string',
'private': True,
'required': True,
},
'csecret': {
'name': _('Consumer Secret'),
'type': 'string',
'private': True,
'required': True,
},
'akey': {
'name': _('Access Key'),
'type': 'string',
'private': True,
'required': True,
},
'asecret': {
'name': _('Access Secret'),
'type': 'string',
'private': True,
'required': True,
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'prefix': '@',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'mode': {
'name': _('Message Mode'),
'type': 'choice:string',
'values': TWITTER_MESSAGE_MODES,
'default': TwitterMessageMode.DM,
},
'cache': {
'name': _('Cache Results'),
'type': 'bool',
'default': True,
},
'to': {
'alias_of': 'targets',
},
})
def __init__(self, ckey, csecret, akey, asecret, targets=None,
mode=TwitterMessageMode.DM, cache=True, **kwargs):
"""
Initialize Twitter Object
"""
super(NotifyTwitter, self).__init__(**kwargs)
if not ckey:
msg = 'An invalid Consumer API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not csecret:
msg = 'An invalid Consumer Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not akey:
msg = 'An invalid Access Token API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not asecret:
msg = 'An invalid Access Token Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Store our webhook mode
self.mode = None \
if not isinstance(mode, six.string_types) else mode.lower()
# Set Cache Flag
self.cache = cache
if self.mode not in TWITTER_MESSAGE_MODES:
msg = 'The Twitter message mode specified ({}) is invalid.' \
.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Identify our targets
self.targets = []
for target in parse_list(targets):
match = IS_USER.match(target)
if match and match.group('user'):
self.targets.append(match.group('user'))
continue
self.logger.warning(
'Dropped invalid user ({}) specified.'.format(target),
)
# Store our data
self.ckey = ckey
self.csecret = csecret
self.akey = akey
self.asecret = asecret
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
# Call the _send_ function applicable to whatever mode we're in
# - calls _send_tweet if the mode is set so
# - calls _send_dm (direct message) otherwise
return getattr(self, '_send_{}'.format(self.mode))(
body=body, title=title, notify_type=notify_type, **kwargs)
def _send_tweet(self, body, title='', notify_type=NotifyType.INFO,
**kwargs):
"""
Twitter Public Tweet
"""
payload = {
'status': body,
}
# Send Tweet
postokay, response = self._fetch(
self.twitter_tweet,
payload=payload,
json=False,
)
if postokay:
self.logger.info(
'Sent Twitter notification as public tweet.')
return postokay
def _send_dm(self, body, title='', notify_type=NotifyType.INFO,
**kwargs):
"""
Twitter Direct Message
"""
# Error Tracking
has_error = False
payload = {
'event': {
'type': 'message_create',
'message_create': {
'target': {
# This gets assigned
'recipient_id': None,
},
'message_data': {
'text': body,
}
}
}
}
# Lookup our users
targets = self._whoami(lazy=self.cache) if not len(self.targets) \
else self._user_lookup(self.targets, lazy=self.cache)
if not targets:
# We failed to lookup any users
self.logger.warning(
'Failed to acquire user(s) to Direct Message via Twitter')
return False
for screen_name, user_id in targets.items():
# Assign our user
payload['event']['message_create']['target']['recipient_id'] = \
user_id
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_dm,
payload=payload,
)
if not postokay:
# Track our error
has_error = True
continue
self.logger.info(
'Sent Twitter DM notification to @{}.'.format(screen_name))
return not has_error
def _whoami(self, lazy=True):
"""
Looks details of current authenticated user
"""
# Prepare a whoami key; this is to prevent conflict with other
# NotifyTwitter declarations that may or may not use a different
# set of authentication keys
whoami_key = '{}{}{}{}'.format(
self.ckey, self.csecret, self.akey, self.asecret)
if lazy and hasattr(NotifyTwitter, '_whoami_cache') \
and whoami_key in getattr(NotifyTwitter, '_whoami_cache'):
# Use cached response
return getattr(NotifyTwitter, '_whoami_cache')[whoami_key]
# Contains a mapping of screen_name to id
results = {}
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_whoami,
method='GET',
json=False,
)
if postokay:
try:
results[response['screen_name']] = response['id']
if lazy:
# Cache our response for future references
if not hasattr(NotifyTwitter, '_whoami_cache'):
setattr(
NotifyTwitter, '_whoami_cache',
{whoami_key: results})
else:
getattr(NotifyTwitter, '_whoami_cache')\
.update({whoami_key: results})
# Update our user cache as well
if not hasattr(NotifyTwitter, '_user_cache'):
setattr(NotifyTwitter, '_user_cache', results)
else:
getattr(NotifyTwitter, '_user_cache').update(results)
except (TypeError, KeyError):
pass
return results
def _user_lookup(self, screen_name, lazy=True):
"""
Looks up a screen name and returns the user id
the screen_name can be a list/set/tuple as well
"""
# Contains a mapping of screen_name to id
results = {}
# Build a unique set of names
names = parse_list(screen_name)
if lazy and hasattr(NotifyTwitter, '_user_cache'):
# Use cached response
results = {k: v for k, v in getattr(
NotifyTwitter, '_user_cache').items() if k in names}
# limit our names if they already exist in our cache
names = [name for name in names if name not in results]
if not len(names):
# They're is nothing further to do
return results
# Twitters API documents that it can lookup to 100
# results at a time.
# https://developer.twitter.com/en/docs/accounts-and-users/\
# follow-search-get-users/api-reference/get-users-lookup
for i in range(0, len(names), 100):
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_lookup,
payload={
'screen_name': names[i:i + 100],
},
json=False,
)
if not postokay or not isinstance(response, list):
# Track our error
continue
# Update our user index
for entry in response:
try:
results[entry['screen_name']] = entry['id']
except (TypeError, KeyError):
pass
# Cache our response for future use; this saves on un-nessisary extra
# hits against the Twitter API when we already know the answer
if lazy:
if not hasattr(NotifyTwitter, '_user_cache'):
setattr(NotifyTwitter, '_user_cache', results)
else:
getattr(NotifyTwitter, '_user_cache').update(results)
return results
def _fetch(self, url, payload=None, method='POST', json=True):
"""
Wrapper to Twitter API requests object
"""
headers = {
'Host': self.twitter_api,
'User-Agent': self.app_id,
}
if json:
headers['Content-Type'] = 'application/json'
payload = dumps(payload)
auth = OAuth1(
self.ckey,
client_secret=self.csecret,
resource_owner_key=self.akey,
resource_owner_secret=self.asecret,
)
# Some Debug Logging
self.logger.debug('Twitter {} URL: {} (cert_verify={})'.format(
method, url, self.verify_certificate))
self.logger.debug('Twitter Payload: %s' % str(payload))
# By default set wait to None
wait = None
if self.ratelimit_remaining == 0:
# Determine how long we should wait for or if we should wait at
# all. This isn't fool-proof because we can't be sure the client
# time (calling this script) is completely synced up with the
# Gitter server. One would hope we're on NTP and our clocks are
# the same allowing this to role smoothly:
now = datetime.utcnow()
if now < self.ratelimit_reset:
# We need to throttle for the difference in seconds
# We add 0.5 seconds to the end just to allow a grace
# period.
wait = (self.ratelimit_reset - now).total_seconds() + 0.5
# Default content response object
content = {}
# Always call throttle before any remote server i/o is made;
self.throttle(wait=wait)
# acquire our request mode
fn = requests.post if method == 'POST' else requests.get
try:
r = fn(
url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyTwitter.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Twitter {} to {}: '
'{}error={}.'.format(
method,
url,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
return (False, content)
try:
content = loads(r.content)
except (TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
content = {}
try:
# Capture rate limiting if possible
self.ratelimit_remaining = \
int(r.headers.get('x-rate-limit-remaining'))
self.ratelimit_reset = datetime.utcfromtimestamp(
int(r.headers.get('x-rate-limit-reset')))
except (TypeError, ValueError):
# This is returned if we could not retrieve this information
# gracefully accept this state and move on
pass
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending Twitter {} to {}: '.
format(method, url))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
return (False, content)
return (True, content)
@property
def body_maxlen(self):
"""
The maximum allowable characters allowed in the body per message
This is used during a Private DM Message Size (not Public Tweets
which are limited to 280 characters)
"""
return 10000 if self.mode == TwitterMessageMode.DM else 280
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'mode': self.mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if len(self.targets) > 0:
args['to'] = ','.join([NotifyTwitter.quote(x, safe='')
for x in self.targets])
return '{schema}://{ckey}/{csecret}/{akey}/{asecret}' \
'/{targets}/?{args}'.format(
schema=self.secure_protocol[0],
ckey=NotifyTwitter.quote(self.ckey, safe=''),
asecret=NotifyTwitter.quote(self.csecret, safe=''),
akey=NotifyTwitter.quote(self.akey, safe=''),
csecret=NotifyTwitter.quote(self.asecret, safe=''),
targets='/'.join(
[NotifyTwitter.quote('@{}'.format(target), safe='')
for target in self.targets]),
args=NotifyTwitter.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The first token is stored in the hostname
consumer_key = NotifyTwitter.unquote(results['host'])
# Acquire remaining tokens
tokens = NotifyTwitter.split_path(results['fullpath'])
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
tokens[0:3]
except (ValueError, AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key
results['csecret'] = consumer_secret
results['akey'] = access_token_key
results['asecret'] = access_token_secret
# The defined twitter mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = \
NotifyTwitter.unquote(results['qsd']['mode'])
results['targets'] = []
# if a user has been defined, add it to the list of targets
if results.get('user'):
results['targets'].append(results.get('user'))
# Store any remaining items as potential targets
results['targets'].extend(tokens[3:])
if 'cache' in results['qsd'] and len(results['qsd']['cache']):
results['cache'] = \
parse_bool(results['qsd']['cache'], True)
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyTwitter.parse_list(results['qsd']['to'])
if results.get('schema', 'twitter').lower() == 'tweet':
# Deprication Notice issued for v0.7.9
NotifyTwitter.logger.deprecate(
'tweet:// has been replaced by twitter://')
return results

@ -0,0 +1,398 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this plugin, you must have a ZulipChat bot defined; See here:
# https://zulipchat.com/help/add-a-bot-or-integration
#
# At the time of writing this plugin the instructions were:
# 1. From your desktop, click on the gear icon in the upper right corner.
# 2. Select Settings.
# 3. On the left, click Your bots.
# 4. Click Add a new bot.
# 5. Fill out the fields, and click Create bot.
# If you know your organization {ID} (as it's part of the zulipchat.com url
# after you signup, then you can also access your bot information by visting:
# https://ID.zulipchat.com/#settings/your-bots
# For example, I create an organization called apprise. Thus my URL would be
# https://apprise.zulipchat.com/#settings/your-bots
# When you're done and have a bot, it's important to remember the username
# you provided the bot and the API key generated.
#
# If your {user} was : goober-bot@apprise.zulipchat.com
# and your {apikey} was: lqn6mpwpam6VZzbCW0o7olmk3hwbQSK
#
# Then the following URLs would be accepted by Apprise:
# - zulip://goober-bot@apprise.zulipchat.com/lqn6mpwpam6VZzbCW0o7olmk3hwbQSK
# - zulip://goober-bot@apprise/lqn6mpwpam6VZzbCW0o7olmk3hwbQSK
# - zulip://goober@apprise/lqn6mpwpam6VZzbCW0o7olmk3hwbQSK
# - zulip://goober@apprise.zulipchat.com/lqn6mpwpam6VZzbCW0o7olmk3hwbQSK
# The API reference used to build this plugin was documented here:
# https://zulipchat.com/api/send-message
#
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..AppriseLocale import gettext_lazy as _
# A Valid Bot Name
VALIDATE_BOTNAME = re.compile(r'(?P<name>[A-Z0-9_]{1,32})(-bot)?', re.I)
# A Valid Bot Token is 32 characters of alpha/numeric
VALIDATE_TOKEN = re.compile(r'[A-Z0-9]{32}', re.I)
# Organization required as part of the API request
VALIDATE_ORG = re.compile(
r'(?P<org>[A-Z0-9_-]{1,32})(\.(?P<hostname>[^\s]+))?', re.I)
# Extend HTTP Error Messages
ZULIP_HTTP_ERROR_MAP = {
401: 'Unauthorized - Invalid Token.',
}
# Used to break path apart into list of channels
TARGET_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
# Used to detect a channel
IS_VALID_TARGET_RE = re.compile(
r'#?(?P<channel>[A-Z0-9_]{1,32})', re.I)
class NotifyZulip(NotifyBase):
"""
A wrapper for Zulip Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Zulip'
# The services URL
service_url = 'https://zulipchat.com/'
# The default secure protocol
secure_protocol = 'zulip'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_zulip'
# Zulip uses the http protocol with JSON requests
notify_url = 'https://{org}.{hostname}/api/v1/messages'
# The maximum allowable characters allowed in the title per message
title_maxlen = 60
# The maximum allowable characters allowed in the body per message
body_maxlen = 10000
# Define object templates
templates = (
'{schema}://{botname}@{organization}/{token}',
'{schema}://{botname}@{organization}/{token}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'botname': {
'name': _('Bot Name'),
'type': 'string',
},
'organization': {
'name': _('Organization'),
'type': 'string',
'required': True,
},
'token': {
'name': _('Token'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'[A-Z0-9]{32}', 'i'),
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'map_to': 'targets',
},
'target_channel': {
'name': _('Target Channel'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
# The default hostname to append to a defined organization
# if one isn't defined in the apprise url
default_hostname = 'zulipchat.com'
# The default channel to notify if no targets are specified
default_notification_channel = 'general'
def __init__(self, botname, organization, token, targets=None, **kwargs):
"""
Initialize Zulip Object
"""
super(NotifyZulip, self).__init__(**kwargs)
# our default hostname
self.hostname = self.default_hostname
try:
match = VALIDATE_BOTNAME.match(botname.strip())
if not match:
# let outer exception handle this
raise TypeError
# The botname
self.botname = match.group('name')
except (TypeError, AttributeError):
msg = 'The Zulip botname specified ({}) is invalid.'\
.format(botname)
self.logger.warning(msg)
raise TypeError(msg)
try:
match = VALIDATE_ORG.match(organization.strip())
if not match:
# let outer exception handle this
raise TypeError
# The organization
self.organization = match.group('org')
if match.group('hostname'):
self.hostname = match.group('hostname')
except (TypeError, AttributeError):
msg = 'The Zulip organization specified ({}) is invalid.'\
.format(organization)
self.logger.warning(msg)
raise TypeError(msg)
try:
if not VALIDATE_TOKEN.match(token.strip()):
# let outer exception handle this
raise TypeError
except (TypeError, AttributeError):
msg = 'The Zulip token specified ({}) is invalid.'\
.format(token)
self.logger.warning(msg)
raise TypeError(msg)
# The token associated with the account
self.token = token.strip()
self.targets = parse_list(targets)
if len(self.targets) == 0:
# No channels identified, use default
self.targets.append(self.default_notification_channel)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Zulip Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
}
# error tracking (used for function return)
has_error = False
# Prepare our notification URL
url = self.notify_url.format(
org=self.organization,
hostname=self.hostname,
)
# prepare JSON Object
payload = {
'subject': title,
'content': body,
}
# Determine Authentication
auth = (
'{botname}-bot@{org}.{hostname}'.format(
botname=self.botname,
org=self.organization,
hostname=self.hostname,
),
self.token,
)
# Create a copy of the target list
targets = list(self.targets)
while len(targets):
target = targets.pop(0)
if GET_EMAIL_RE.match(target):
# Send a private message
payload['type'] = 'private'
else:
# Send a stream message
payload['type'] = 'stream'
# Set our target
payload['to'] = target
self.logger.debug('Zulip POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('Zulip Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyZulip.http_response_code_lookup(
r.status_code, ZULIP_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Zulip notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent Zulip notification to {}.'.format(target))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Zulip '
'notification to {}.'.format(target))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
# simplify our organization in our URL if we can
organization = '{}{}'.format(
self.organization,
'.{}'.format(self.hostname)
if self.hostname != self.default_hostname else '')
return '{schema}://{botname}@{org}/{token}/' \
'{targets}?{args}'.format(
schema=self.secure_protocol,
botname=self.botname,
org=NotifyZulip.quote(organization, safe=''),
token=NotifyZulip.quote(self.token, safe=''),
targets='/'.join(
[NotifyZulip.quote(x, safe='') for x in self.targets]),
args=NotifyZulip.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# The botname
results['botname'] = NotifyZulip.unquote(results['user'])
# The first token is stored in the hostname
results['organization'] = NotifyZulip.unquote(results['host'])
# Now fetch the remaining tokens
try:
results['token'] = \
NotifyZulip.split_path(results['fullpath'])[0]
except IndexError:
# no token
results['token'] = None
# Get unquoted entries
results['targets'] = NotifyZulip.split_path(results['fullpath'])[1:]
# Support the 'to' variable so that we can support rooms this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += [x for x in filter(
bool, TARGET_LIST_DELIM.split(
NotifyZulip.unquote(results['qsd']['to'])))]
return results
Loading…
Cancel
Save