You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/apprise/plugins/NotifyRocketChat.py

708 lines
23 KiB

# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import six
import requests
from json import loads
from json import dumps
from itertools import chain
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyImageSize
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_list
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
IS_CHANNEL = re.compile(r'^#(?P<name>[A-Za-z0-9_-]+)$')
IS_USER = re.compile(r'^@(?P<name>[A-Za-z0-9._-]+)$')
IS_ROOM_ID = re.compile(r'^(?P<name>[A-Za-z0-9]+)$')
# Extend HTTP Error Messages
RC_HTTP_ERROR_MAP = {
400: 'Channel/RoomId is wrong format, or missing from server.',
401: 'Authentication tokens provided is invalid or missing.',
}
# Used to break apart list of potential tags by their delimiter
# into a usable list.
LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
class RocketChatAuthMode(object):
"""
The Chat Authentication mode is detected
"""
# providing a webhook
WEBHOOK = "webhook"
# Providing a username and password (default)
BASIC = "basic"
# Define our authentication modes
ROCKETCHAT_AUTH_MODES = (
RocketChatAuthMode.WEBHOOK,
RocketChatAuthMode.BASIC,
)
class NotifyRocketChat(NotifyBase):
"""
A wrapper for Notify Rocket.Chat Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Rocket.Chat'
# The services URL
service_url = 'https://rocket.chat/'
# The default protocol
protocol = 'rocket'
# The default secure protocol
secure_protocol = 'rockets'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_rocketchat'
# Allows the user to specify the NotifyImageSize object; this is supported
# through the webhook
image_size = NotifyImageSize.XY_128
# The title is not used
title_maxlen = 0
# The maximum size of the message
body_maxlen = 1000
# Default to markdown
notify_format = NotifyFormat.MARKDOWN
# Define object templates
templates = (
'{schema}://{user}:{password}@{host}:{port}/{targets}',
'{schema}://{user}:{password}@{host}/{targets}',
'{schema}://{webhook}@{host}',
'{schema}://{webhook}@{host}:{port}',
'{schema}://{webhook}@{host}/{targets}',
'{schema}://{webhook}@{host}:{port}/{targets}',
)
# Define our template arguments
template_tokens = dict(NotifyBase.template_tokens, **{
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
'webhook': {
'name': _('Webhook'),
'type': 'string',
},
'target_channel': {
'name': _('Target Channel'),
'type': 'string',
'prefix': '#',
'map_to': 'targets',
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'prefix': '@',
'map_to': 'targets',
},
'target_room': {
'name': _('Target Room ID'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'mode': {
'name': _('Webhook Mode'),
'type': 'choice:string',
'values': ROCKETCHAT_AUTH_MODES,
},
'avatar': {
'name': _('Use Avatar'),
'type': 'bool',
'default': True,
},
'to': {
'alias_of': 'targets',
},
})
def __init__(self, webhook=None, targets=None, mode=None, avatar=True,
**kwargs):
"""
Initialize Notify Rocket.Chat Object
"""
super(NotifyRocketChat, self).__init__(**kwargs)
# Set our schema
self.schema = 'https' if self.secure else 'http'
# Prepare our URL
self.api_url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
self.api_url += ':%d' % self.port
# Initialize channels list
self.channels = list()
# Initialize room list
self.rooms = list()
# Initialize user list (webhook only)
self.users = list()
# Assign our webhook (if defined)
self.webhook = webhook
# Place an avatar image to associate with our content
self.avatar = avatar
# Used to track token headers upon authentication (if successful)
# This is only used if not on webhook mode
self.headers = {}
# Authentication mode
self.mode = None \
if not isinstance(mode, six.string_types) \
else mode.lower()
if self.mode and self.mode not in ROCKETCHAT_AUTH_MODES:
msg = 'The authentication mode specified ({}) is invalid.'.format(
mode)
self.logger.warning(msg)
raise TypeError(msg)
# Detect our mode if it wasn't specified
if not self.mode:
if self.webhook is not None:
# Just a username was specified, we treat this as a webhook
self.mode = RocketChatAuthMode.WEBHOOK
else:
self.mode = RocketChatAuthMode.BASIC
if self.mode == RocketChatAuthMode.BASIC \
and not (self.user and self.password):
# Username & Password is required for Rocket Chat to work
msg = 'No Rocket.Chat user/pass combo was specified.'
self.logger.warning(msg)
raise TypeError(msg)
elif self.mode == RocketChatAuthMode.WEBHOOK and not self.webhook:
msg = 'No Rocket.Chat Incoming Webhook was specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Validate recipients and drop bad ones:
for recipient in parse_list(targets):
result = IS_CHANNEL.match(recipient)
if result:
# store valid device
self.channels.append(result.group('name'))
continue
result = IS_ROOM_ID.match(recipient)
if result:
# store valid room
self.rooms.append(result.group('name'))
continue
result = IS_USER.match(recipient)
if result:
# store valid room
self.users.append(result.group('name'))
continue
self.logger.warning(
'Dropped invalid channel/room/user '
'({}) specified.'.format(recipient),
)
if self.mode == RocketChatAuthMode.BASIC and \
len(self.rooms) == 0 and len(self.channels) == 0:
msg = 'No Rocket.Chat room and/or channels specified to notify.'
self.logger.warning(msg)
raise TypeError(msg)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'avatar': 'yes' if self.avatar else 'no',
'mode': self.mode,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
# Determine Authentication
if self.mode == RocketChatAuthMode.BASIC:
auth = '{user}:{password}@'.format(
user=NotifyRocketChat.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
else:
auth = '{user}{webhook}@'.format(
user='{}:'.format(NotifyRocketChat.quote(self.user, safe=''))
if self.user else '',
webhook=self.pprint(self.webhook, privacy, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/{targets}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join(
[NotifyRocketChat.quote(x, safe='') for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Rooms are as is
self.rooms,
# Users
['@{}'.format(x) for x in self.users],
)]),
params=NotifyRocketChat.urlencode(params),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
wrapper to _send since we can alert more then one channel
"""
# Call the _send_ function applicable to whatever mode we're in
# - calls _send_webhook_notification if the mode variable is set
# - calls _send_basic_notification if the mode variable is not set
return getattr(self, '_send_{}_notification'.format(self.mode))(
body=body, title=title, notify_type=notify_type, **kwargs)
def _send_webhook_notification(self, body, title='',
notify_type=NotifyType.INFO, **kwargs):
"""
Sends a webhook notification
"""
# Our payload object
payload = self._payload(body, title, notify_type)
# Assemble our webhook URL
path = 'hooks/{}'.format(self.webhook)
# Build our list of channels/rooms/users (if any identified)
targets = ['@{}'.format(u) for u in self.users]
targets.extend(['#{}'.format(c) for c in self.channels])
targets.extend(['{}'.format(r) for r in self.rooms])
if len(targets) == 0:
# We can take an early exit
return self._send(
payload, notify_type=notify_type, path=path, **kwargs)
# Otherwise we want to iterate over each of the targets
# Initiaize our error tracking
has_error = False
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
}
while len(targets):
# Retrieve our target
target = targets.pop(0)
# Assign our channel/room/user
payload['channel'] = target
if not self._send(
dumps(payload), notify_type=notify_type, path=path,
headers=headers, **kwargs):
# toggle flag
has_error = True
return not has_error
def _send_basic_notification(self, body, title='',
notify_type=NotifyType.INFO, **kwargs):
"""
Authenticates with the server using a user/pass combo for
notifications.
"""
# Track whether we authenticated okay
if not self.login():
return False
# prepare JSON Object
payload = self._payload(body, title, notify_type)
# Initiaize our error tracking
has_error = False
# Create a copy of our channels to notify against
channels = list(self.channels)
_payload = payload.copy()
while len(channels) > 0:
# Get Channel
channel = channels.pop(0)
_payload['channel'] = channel
if not self._send(
_payload, notify_type=notify_type, headers=self.headers,
**kwargs):
# toggle flag
has_error = True
# Create a copy of our room id's to notify against
rooms = list(self.rooms)
_payload = payload.copy()
while len(rooms):
# Get Room
room = rooms.pop(0)
_payload['roomId'] = room
if not self._send(
payload, notify_type=notify_type, headers=self.headers,
**kwargs):
# toggle flag
has_error = True
# logout
self.logout()
return not has_error
def _payload(self, body, title='', notify_type=NotifyType.INFO):
"""
Prepares a payload object
"""
# prepare JSON Object
payload = {
"text": body,
}
# apply our images if they're set to be displayed
image_url = self.image_url(notify_type)
if self.avatar:
payload['avatar'] = image_url
return payload
def _send(self, payload, notify_type, path='api/v1/chat.postMessage',
headers=None, **kwargs):
"""
Perform Notify Rocket.Chat Notification
"""
api_url = '{}/{}'.format(self.api_url, path)
self.logger.debug('Rocket.Chat POST URL: %s (cert_verify=%r)' % (
api_url, self.verify_certificate))
self.logger.debug('Rocket.Chat Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
api_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyRocketChat.http_response_code_lookup(
r.status_code, RC_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Rocket.Chat {}:notification: '
'{}{}error={}.'.format(
self.mode,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info(
'Sent Rocket.Chat {}:notification.'.format(self.mode))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Rocket.Chat '
'{}:notification.'.format(self.mode))
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
def login(self):
"""
login to our server
"""
payload = {
'username': self.user,
'password': self.password,
}
api_url = '{}/{}'.format(self.api_url, 'api/v1/login')
try:
r = requests.post(
api_url,
data=payload,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyRocketChat.http_response_code_lookup(
r.status_code, RC_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to authenticate {} with Rocket.Chat: '
'{}{}error={}.'.format(
self.user,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.debug('Rocket.Chat authentication successful')
response = loads(r.content)
if response.get('status') != "success":
self.logger.warning(
'Could not authenticate {} with Rocket.Chat.'.format(
self.user))
return False
# Set our headers for further communication
self.headers['X-Auth-Token'] = response.get(
'data', {'authToken': None}).get('authToken')
self.headers['X-User-Id'] = response.get(
'data', {'userId': None}).get('userId')
except (AttributeError, TypeError, ValueError):
# Our response was not the JSON type we had expected it to be
# - ValueError = r.content is Unparsable
# - TypeError = r.content is None
# - AttributeError = r is None
self.logger.warning(
'A commuication error occurred authenticating {} on '
'Rocket.Chat.'.format(self.user))
return False
except requests.RequestException as e:
self.logger.warning(
'A connection error occurred authenticating {} on '
'Rocket.Chat.'.format(self.user))
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
def logout(self):
"""
logout of our server
"""
api_url = '{}/{}'.format(self.api_url, 'api/v1/logout')
try:
r = requests.post(
api_url,
headers=self.headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyRocketChat.http_response_code_lookup(
r.status_code, RC_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to logoff {} from Rocket.Chat: '
'{}{}error={}.'.format(
self.user,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.debug(
'Rocket.Chat log off successful; response %s.' % (
r.content))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred logging off the '
'Rocket.Chat server')
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
try:
# Attempt to detect the webhook (if specified in the URL)
# If no webhook is specified, then we just pass along as if nothing
# happened. However if we do find a webhook, we want to rebuild our
# URL without it since it conflicts with standard URLs. Support
# %2F since that is a forward slash escaped
# rocket://webhook@host
# rocket://user:webhook@host
match = re.match(
r'^\s*(?P<schema>[^:]+://)((?P<user>[^:]+):)?'
r'(?P<webhook>[a-z0-9]+(/|%2F)'
r'[a-z0-9]+)\@(?P<url>.+)$', url, re.I)
except TypeError:
# Not a string
return None
if match:
# Re-assemble our URL without the webhook
url = '{schema}{user}{url}'.format(
schema=match.group('schema'),
user='{}@'.format(match.group('user'))
if match.group('user') else '',
url=match.group('url'),
)
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
if match:
# store our webhook
results['webhook'] = \
NotifyRocketChat.unquote(match.group('webhook'))
# Take on the password too in the event we're in basic mode
# We do not unquote() as this is done at a later state
results['password'] = match.group('webhook')
# Apply our targets
results['targets'] = NotifyRocketChat.split_path(results['fullpath'])
# The user may have forced the mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = \
NotifyRocketChat.unquote(results['qsd']['mode'])
# avatar icon
results['avatar'] = \
parse_bool(results['qsd'].get('avatar', True))
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyRocketChat.parse_list(results['qsd']['to'])
# The 'webhook' over-ride (if specified)
if 'webhook' in results['qsd'] and len(results['qsd']['webhook']):
results['webhook'] = \
NotifyRocketChat.unquote(results['qsd']['webhook'])
return results