parent
ae731bb78b
commit
e6b8b1ad19
Before Width: | Height: | Size: 36 KiB After Width: | Height: | Size: 157 KiB |
@ -1,374 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
#
|
|
||||||
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This code is licensed under the MIT License.
|
|
||||||
#
|
|
||||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
# of this software and associated documentation files(the "Software"), to deal
|
|
||||||
# in the Software without restriction, including without limitation the rights
|
|
||||||
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
|
||||||
# copies of the Software, and to permit persons to whom the Software is
|
|
||||||
# furnished to do so, subject to the following conditions :
|
|
||||||
#
|
|
||||||
# The above copyright notice and this permission notice shall be included in
|
|
||||||
# all copies or substantial portions of the Software.
|
|
||||||
#
|
|
||||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
|
||||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
||||||
# THE SOFTWARE.
|
|
||||||
|
|
||||||
from .gntp import notifier
|
|
||||||
from .gntp import errors
|
|
||||||
from ..NotifyBase import NotifyBase
|
|
||||||
from ...URLBase import PrivacyMode
|
|
||||||
from ...common import NotifyImageSize
|
|
||||||
from ...common import NotifyType
|
|
||||||
from ...utils import parse_bool
|
|
||||||
from ...AppriseLocale import gettext_lazy as _
|
|
||||||
|
|
||||||
|
|
||||||
# Priorities
|
|
||||||
class GrowlPriority(object):
|
|
||||||
LOW = -2
|
|
||||||
MODERATE = -1
|
|
||||||
NORMAL = 0
|
|
||||||
HIGH = 1
|
|
||||||
EMERGENCY = 2
|
|
||||||
|
|
||||||
|
|
||||||
GROWL_PRIORITIES = (
|
|
||||||
GrowlPriority.LOW,
|
|
||||||
GrowlPriority.MODERATE,
|
|
||||||
GrowlPriority.NORMAL,
|
|
||||||
GrowlPriority.HIGH,
|
|
||||||
GrowlPriority.EMERGENCY,
|
|
||||||
)
|
|
||||||
|
|
||||||
GROWL_NOTIFICATION_TYPE = "New Messages"
|
|
||||||
|
|
||||||
|
|
||||||
class NotifyGrowl(NotifyBase):
|
|
||||||
"""
|
|
||||||
A wrapper to Growl Notifications
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# The default descriptive name associated with the Notification
|
|
||||||
service_name = 'Growl'
|
|
||||||
|
|
||||||
# The services URL
|
|
||||||
service_url = 'http://growl.info/'
|
|
||||||
|
|
||||||
# The default protocol
|
|
||||||
protocol = 'growl'
|
|
||||||
|
|
||||||
# A URL that takes you to the setup/help of the specific protocol
|
|
||||||
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
|
|
||||||
|
|
||||||
# Allows the user to specify the NotifyImageSize object
|
|
||||||
image_size = NotifyImageSize.XY_72
|
|
||||||
|
|
||||||
# Disable throttle rate for Growl requests since they are normally
|
|
||||||
# local anyway
|
|
||||||
request_rate_per_sec = 0
|
|
||||||
|
|
||||||
# A title can not be used for Growl Messages. Setting this to zero will
|
|
||||||
# cause any title (if defined) to get placed into the message body.
|
|
||||||
title_maxlen = 0
|
|
||||||
|
|
||||||
# Limit results to just the first 10 line otherwise there is just to much
|
|
||||||
# content to display
|
|
||||||
body_max_line_count = 2
|
|
||||||
|
|
||||||
# Default Growl Port
|
|
||||||
default_port = 23053
|
|
||||||
|
|
||||||
# Define object templates
|
|
||||||
# Define object templates
|
|
||||||
templates = (
|
|
||||||
'{schema}://{host}',
|
|
||||||
'{schema}://{host}:{port}',
|
|
||||||
'{schema}://{password}@{host}',
|
|
||||||
'{schema}://{password}@{host}:{port}',
|
|
||||||
)
|
|
||||||
|
|
||||||
# Define our template tokens
|
|
||||||
template_tokens = dict(NotifyBase.template_tokens, **{
|
|
||||||
'host': {
|
|
||||||
'name': _('Hostname'),
|
|
||||||
'type': 'string',
|
|
||||||
'required': True,
|
|
||||||
},
|
|
||||||
'port': {
|
|
||||||
'name': _('Port'),
|
|
||||||
'type': 'int',
|
|
||||||
'min': 1,
|
|
||||||
'max': 65535,
|
|
||||||
},
|
|
||||||
'password': {
|
|
||||||
'name': _('Password'),
|
|
||||||
'type': 'string',
|
|
||||||
'private': True,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
# Define our template arguments
|
|
||||||
template_args = dict(NotifyBase.template_args, **{
|
|
||||||
'priority': {
|
|
||||||
'name': _('Priority'),
|
|
||||||
'type': 'choice:int',
|
|
||||||
'values': GROWL_PRIORITIES,
|
|
||||||
'default': GrowlPriority.NORMAL,
|
|
||||||
},
|
|
||||||
'version': {
|
|
||||||
'name': _('Version'),
|
|
||||||
'type': 'choice:int',
|
|
||||||
'values': (1, 2),
|
|
||||||
'default': 2,
|
|
||||||
},
|
|
||||||
'image': {
|
|
||||||
'name': _('Include Image'),
|
|
||||||
'type': 'bool',
|
|
||||||
'default': True,
|
|
||||||
'map_to': 'include_image',
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
def __init__(self, priority=None, version=2, include_image=True, **kwargs):
|
|
||||||
"""
|
|
||||||
Initialize Growl Object
|
|
||||||
"""
|
|
||||||
super(NotifyGrowl, self).__init__(**kwargs)
|
|
||||||
|
|
||||||
if not self.port:
|
|
||||||
self.port = self.default_port
|
|
||||||
|
|
||||||
# The Priority of the message
|
|
||||||
if priority not in GROWL_PRIORITIES:
|
|
||||||
self.priority = GrowlPriority.NORMAL
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.priority = priority
|
|
||||||
|
|
||||||
# Always default the sticky flag to False
|
|
||||||
self.sticky = False
|
|
||||||
|
|
||||||
# Store Version
|
|
||||||
self.version = version
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
'applicationName': self.app_id,
|
|
||||||
'notifications': [GROWL_NOTIFICATION_TYPE, ],
|
|
||||||
'defaultNotifications': [GROWL_NOTIFICATION_TYPE, ],
|
|
||||||
'hostname': self.host,
|
|
||||||
'port': self.port,
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.password is not None:
|
|
||||||
payload['password'] = self.password
|
|
||||||
|
|
||||||
self.logger.debug('Growl Registration Payload: %s' % str(payload))
|
|
||||||
self.growl = notifier.GrowlNotifier(**payload)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.growl.register()
|
|
||||||
self.logger.debug(
|
|
||||||
'Growl server registration completed successfully.'
|
|
||||||
)
|
|
||||||
|
|
||||||
except errors.NetworkError:
|
|
||||||
msg = 'A network error occured sending Growl ' \
|
|
||||||
'notification to {}.'.format(self.host)
|
|
||||||
self.logger.warning(msg)
|
|
||||||
raise TypeError(msg)
|
|
||||||
|
|
||||||
except errors.AuthError:
|
|
||||||
msg = 'An authentication error occured sending Growl ' \
|
|
||||||
'notification to {}.'.format(self.host)
|
|
||||||
self.logger.warning(msg)
|
|
||||||
raise TypeError(msg)
|
|
||||||
|
|
||||||
except errors.UnsupportedError:
|
|
||||||
msg = 'An unsupported error occured sending Growl ' \
|
|
||||||
'notification to {}.'.format(self.host)
|
|
||||||
self.logger.warning(msg)
|
|
||||||
raise TypeError(msg)
|
|
||||||
|
|
||||||
# Track whether or not we want to send an image with our notification
|
|
||||||
# or not.
|
|
||||||
self.include_image = include_image
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
|
|
||||||
"""
|
|
||||||
Perform Growl Notification
|
|
||||||
"""
|
|
||||||
|
|
||||||
icon = None
|
|
||||||
if self.version >= 2:
|
|
||||||
# URL Based
|
|
||||||
icon = None if not self.include_image \
|
|
||||||
else self.image_url(notify_type)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Raw
|
|
||||||
icon = None if not self.include_image \
|
|
||||||
else self.image_raw(notify_type)
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
'noteType': GROWL_NOTIFICATION_TYPE,
|
|
||||||
'title': title,
|
|
||||||
'description': body,
|
|
||||||
'icon': icon is not None,
|
|
||||||
'sticky': False,
|
|
||||||
'priority': self.priority,
|
|
||||||
}
|
|
||||||
self.logger.debug('Growl Payload: %s' % str(payload))
|
|
||||||
|
|
||||||
# Update icon of payload to be raw data; this is intentionally done
|
|
||||||
# here after we spit the debug message above (so we don't try to
|
|
||||||
# print the binary contents of an image
|
|
||||||
payload['icon'] = icon
|
|
||||||
|
|
||||||
# Always call throttle before any remote server i/o is made
|
|
||||||
self.throttle()
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = self.growl.notify(**payload)
|
|
||||||
if not isinstance(response, bool):
|
|
||||||
self.logger.warning(
|
|
||||||
'Growl notification failed to send with response: %s' %
|
|
||||||
str(response),
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.logger.info('Sent Growl notification.')
|
|
||||||
|
|
||||||
except errors.BaseError as e:
|
|
||||||
# Since Growl servers listen for UDP broadcasts, it's possible
|
|
||||||
# that you will never get to this part of the code since there is
|
|
||||||
# no acknowledgement as to whether it accepted what was sent to it
|
|
||||||
# or not.
|
|
||||||
|
|
||||||
# However, if the host/server is unavailable, you will get to this
|
|
||||||
# point of the code.
|
|
||||||
self.logger.warning(
|
|
||||||
'A Connection error occured sending Growl '
|
|
||||||
'notification to %s.' % self.host)
|
|
||||||
self.logger.debug('Growl Exception: %s' % str(e))
|
|
||||||
|
|
||||||
# Return; we're done
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def url(self, privacy=False, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
Returns the URL built dynamically based on specified arguments.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_map = {
|
|
||||||
GrowlPriority.LOW: 'low',
|
|
||||||
GrowlPriority.MODERATE: 'moderate',
|
|
||||||
GrowlPriority.NORMAL: 'normal',
|
|
||||||
GrowlPriority.HIGH: 'high',
|
|
||||||
GrowlPriority.EMERGENCY: 'emergency',
|
|
||||||
}
|
|
||||||
|
|
||||||
# Define any arguments set
|
|
||||||
args = {
|
|
||||||
'format': self.notify_format,
|
|
||||||
'overflow': self.overflow_mode,
|
|
||||||
'image': 'yes' if self.include_image else 'no',
|
|
||||||
'priority':
|
|
||||||
_map[GrowlPriority.NORMAL] if self.priority not in _map
|
|
||||||
else _map[self.priority],
|
|
||||||
'version': self.version,
|
|
||||||
'verify': 'yes' if self.verify_certificate else 'no',
|
|
||||||
}
|
|
||||||
|
|
||||||
auth = ''
|
|
||||||
if self.user:
|
|
||||||
# The growl password is stored in the user field
|
|
||||||
auth = '{password}@'.format(
|
|
||||||
password=self.pprint(
|
|
||||||
self.user, privacy, mode=PrivacyMode.Secret, safe=''),
|
|
||||||
)
|
|
||||||
|
|
||||||
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
|
|
||||||
schema=self.secure_protocol if self.secure else self.protocol,
|
|
||||||
auth=auth,
|
|
||||||
hostname=NotifyGrowl.quote(self.host, safe=''),
|
|
||||||
port='' if self.port is None or self.port == self.default_port
|
|
||||||
else ':{}'.format(self.port),
|
|
||||||
args=NotifyGrowl.urlencode(args),
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def parse_url(url):
|
|
||||||
"""
|
|
||||||
Parses the URL and returns enough arguments that can allow
|
|
||||||
us to substantiate this object.
|
|
||||||
|
|
||||||
"""
|
|
||||||
results = NotifyBase.parse_url(url)
|
|
||||||
|
|
||||||
if not results:
|
|
||||||
# We're done early as we couldn't load the results
|
|
||||||
return results
|
|
||||||
|
|
||||||
version = None
|
|
||||||
if 'version' in results['qsd'] and len(results['qsd']['version']):
|
|
||||||
# Allow the user to specify the version of the protocol to use.
|
|
||||||
try:
|
|
||||||
version = int(
|
|
||||||
NotifyGrowl.unquote(
|
|
||||||
results['qsd']['version']).strip().split('.')[0])
|
|
||||||
|
|
||||||
except (AttributeError, IndexError, TypeError, ValueError):
|
|
||||||
NotifyGrowl.logger.warning(
|
|
||||||
'An invalid Growl version of "%s" was specified and will '
|
|
||||||
'be ignored.' % results['qsd']['version']
|
|
||||||
)
|
|
||||||
pass
|
|
||||||
|
|
||||||
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
|
|
||||||
_map = {
|
|
||||||
'l': GrowlPriority.LOW,
|
|
||||||
'm': GrowlPriority.MODERATE,
|
|
||||||
'n': GrowlPriority.NORMAL,
|
|
||||||
'h': GrowlPriority.HIGH,
|
|
||||||
'e': GrowlPriority.EMERGENCY,
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
results['priority'] = \
|
|
||||||
_map[results['qsd']['priority'][0].lower()]
|
|
||||||
|
|
||||||
except KeyError:
|
|
||||||
# No priority was set
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Because of the URL formatting, the password is actually where the
|
|
||||||
# username field is. For this reason, we just preform this small hack
|
|
||||||
# to make it (the URL) conform correctly. The following strips out the
|
|
||||||
# existing password entry (if exists) so that it can be swapped with
|
|
||||||
# the new one we specify.
|
|
||||||
if results.get('password', None) is None:
|
|
||||||
results['password'] = results.get('user', None)
|
|
||||||
|
|
||||||
# Include images with our message
|
|
||||||
results['include_image'] = \
|
|
||||||
parse_bool(results['qsd'].get('image', True))
|
|
||||||
|
|
||||||
# Set our version
|
|
||||||
if version:
|
|
||||||
results['version'] = version
|
|
||||||
|
|
||||||
return results
|
|
@ -1,141 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from optparse import OptionParser, OptionGroup
|
|
||||||
|
|
||||||
from .notifier import GrowlNotifier
|
|
||||||
from .shim import RawConfigParser
|
|
||||||
from .version import __version__
|
|
||||||
|
|
||||||
DEFAULT_CONFIG = os.path.expanduser('~/.gntp')
|
|
||||||
|
|
||||||
config = RawConfigParser({
|
|
||||||
'hostname': 'localhost',
|
|
||||||
'password': None,
|
|
||||||
'port': 23053,
|
|
||||||
})
|
|
||||||
config.read([DEFAULT_CONFIG])
|
|
||||||
if not config.has_section('gntp'):
|
|
||||||
config.add_section('gntp')
|
|
||||||
|
|
||||||
|
|
||||||
class ClientParser(OptionParser):
|
|
||||||
def __init__(self):
|
|
||||||
OptionParser.__init__(self, version="%%prog %s" % __version__)
|
|
||||||
|
|
||||||
group = OptionGroup(self, "Network Options")
|
|
||||||
group.add_option("-H", "--host",
|
|
||||||
dest="host", default=config.get('gntp', 'hostname'),
|
|
||||||
help="Specify a hostname to which to send a remote notification. [%default]")
|
|
||||||
group.add_option("--port",
|
|
||||||
dest="port", default=config.getint('gntp', 'port'), type="int",
|
|
||||||
help="port to listen on [%default]")
|
|
||||||
group.add_option("-P", "--password",
|
|
||||||
dest='password', default=config.get('gntp', 'password'),
|
|
||||||
help="Network password")
|
|
||||||
self.add_option_group(group)
|
|
||||||
|
|
||||||
group = OptionGroup(self, "Notification Options")
|
|
||||||
group.add_option("-n", "--name",
|
|
||||||
dest="app", default='Python GNTP Test Client',
|
|
||||||
help="Set the name of the application [%default]")
|
|
||||||
group.add_option("-s", "--sticky",
|
|
||||||
dest='sticky', default=False, action="store_true",
|
|
||||||
help="Make the notification sticky [%default]")
|
|
||||||
group.add_option("--image",
|
|
||||||
dest="icon", default=None,
|
|
||||||
help="Icon for notification (URL or /path/to/file)")
|
|
||||||
group.add_option("-m", "--message",
|
|
||||||
dest="message", default=None,
|
|
||||||
help="Sets the message instead of using stdin")
|
|
||||||
group.add_option("-p", "--priority",
|
|
||||||
dest="priority", default=0, type="int",
|
|
||||||
help="-2 to 2 [%default]")
|
|
||||||
group.add_option("-d", "--identifier",
|
|
||||||
dest="identifier",
|
|
||||||
help="Identifier for coalescing")
|
|
||||||
group.add_option("-t", "--title",
|
|
||||||
dest="title", default=None,
|
|
||||||
help="Set the title of the notification [%default]")
|
|
||||||
group.add_option("-N", "--notification",
|
|
||||||
dest="name", default='Notification',
|
|
||||||
help="Set the notification name [%default]")
|
|
||||||
group.add_option("--callback",
|
|
||||||
dest="callback",
|
|
||||||
help="URL callback")
|
|
||||||
self.add_option_group(group)
|
|
||||||
|
|
||||||
# Extra Options
|
|
||||||
self.add_option('-v', '--verbose',
|
|
||||||
dest='verbose', default=0, action='count',
|
|
||||||
help="Verbosity levels")
|
|
||||||
|
|
||||||
def parse_args(self, args=None, values=None):
|
|
||||||
values, args = OptionParser.parse_args(self, args, values)
|
|
||||||
|
|
||||||
if values.message is None:
|
|
||||||
print('Enter a message followed by Ctrl-D')
|
|
||||||
try:
|
|
||||||
message = sys.stdin.read()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
exit()
|
|
||||||
else:
|
|
||||||
message = values.message
|
|
||||||
|
|
||||||
if values.title is None:
|
|
||||||
values.title = ' '.join(args)
|
|
||||||
|
|
||||||
# If we still have an empty title, use the
|
|
||||||
# first bit of the message as the title
|
|
||||||
if values.title == '':
|
|
||||||
values.title = message[:20]
|
|
||||||
|
|
||||||
values.verbose = logging.WARNING - values.verbose * 10
|
|
||||||
|
|
||||||
return values, message
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
(options, message) = ClientParser().parse_args()
|
|
||||||
logging.basicConfig(level=options.verbose)
|
|
||||||
if not os.path.exists(DEFAULT_CONFIG):
|
|
||||||
logging.info('No config read found at %s', DEFAULT_CONFIG)
|
|
||||||
|
|
||||||
growl = GrowlNotifier(
|
|
||||||
applicationName=options.app,
|
|
||||||
notifications=[options.name],
|
|
||||||
defaultNotifications=[options.name],
|
|
||||||
hostname=options.host,
|
|
||||||
password=options.password,
|
|
||||||
port=options.port,
|
|
||||||
)
|
|
||||||
result = growl.register()
|
|
||||||
if result is not True:
|
|
||||||
exit(result)
|
|
||||||
|
|
||||||
# This would likely be better placed within the growl notifier
|
|
||||||
# class but until I make _checkIcon smarter this is "easier"
|
|
||||||
if options.icon is not None and not options.icon.startswith('http'):
|
|
||||||
logging.info('Loading image %s', options.icon)
|
|
||||||
f = open(options.icon)
|
|
||||||
options.icon = f.read()
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
result = growl.notify(
|
|
||||||
noteType=options.name,
|
|
||||||
title=options.title,
|
|
||||||
description=message,
|
|
||||||
icon=options.icon,
|
|
||||||
sticky=options.sticky,
|
|
||||||
priority=options.priority,
|
|
||||||
callback=options.callback,
|
|
||||||
identifier=options.identifier,
|
|
||||||
)
|
|
||||||
if result is not True:
|
|
||||||
exit(result)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,77 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
"""
|
|
||||||
The gntp.config module is provided as an extended GrowlNotifier object that takes
|
|
||||||
advantage of the ConfigParser module to allow us to setup some default values
|
|
||||||
(such as hostname, password, and port) in a more global way to be shared among
|
|
||||||
programs using gntp
|
|
||||||
"""
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
|
|
||||||
from .gntp import notifier
|
|
||||||
from .gntp import shim
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
'mini',
|
|
||||||
'GrowlNotifier'
|
|
||||||
]
|
|
||||||
|
|
||||||
logger = logging.getLogger('gntp')
|
|
||||||
|
|
||||||
|
|
||||||
class GrowlNotifier(notifier.GrowlNotifier):
|
|
||||||
"""
|
|
||||||
ConfigParser enhanced GrowlNotifier object
|
|
||||||
|
|
||||||
For right now, we are only interested in letting users overide certain
|
|
||||||
values from ~/.gntp
|
|
||||||
|
|
||||||
::
|
|
||||||
|
|
||||||
[gntp]
|
|
||||||
hostname = ?
|
|
||||||
password = ?
|
|
||||||
port = ?
|
|
||||||
"""
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
config = shim.RawConfigParser({
|
|
||||||
'hostname': kwargs.get('hostname', 'localhost'),
|
|
||||||
'password': kwargs.get('password'),
|
|
||||||
'port': kwargs.get('port', 23053),
|
|
||||||
})
|
|
||||||
|
|
||||||
config.read([os.path.expanduser('~/.gntp')])
|
|
||||||
|
|
||||||
# If the file does not exist, then there will be no gntp section defined
|
|
||||||
# and the config.get() lines below will get confused. Since we are not
|
|
||||||
# saving the config, it should be safe to just add it here so the
|
|
||||||
# code below doesn't complain
|
|
||||||
if not config.has_section('gntp'):
|
|
||||||
logger.info('Error reading ~/.gntp config file')
|
|
||||||
config.add_section('gntp')
|
|
||||||
|
|
||||||
kwargs['password'] = config.get('gntp', 'password')
|
|
||||||
kwargs['hostname'] = config.get('gntp', 'hostname')
|
|
||||||
kwargs['port'] = config.getint('gntp', 'port')
|
|
||||||
|
|
||||||
super(GrowlNotifier, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def mini(description, **kwargs):
|
|
||||||
"""Single notification function
|
|
||||||
|
|
||||||
Simple notification function in one line. Has only one required parameter
|
|
||||||
and attempts to use reasonable defaults for everything else
|
|
||||||
:param string description: Notification message
|
|
||||||
"""
|
|
||||||
kwargs['notifierFactory'] = GrowlNotifier
|
|
||||||
notifier.mini(description, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# If we're running this module directly we're likely running it as a test
|
|
||||||
# so extra debugging is useful
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
mini('Testing mini notification')
|
|
@ -1,511 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
|
|
||||||
from . import shim
|
|
||||||
from . import errors as errors
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
'GNTPRegister',
|
|
||||||
'GNTPNotice',
|
|
||||||
'GNTPSubscribe',
|
|
||||||
'GNTPOK',
|
|
||||||
'GNTPError',
|
|
||||||
'parse_gntp',
|
|
||||||
]
|
|
||||||
|
|
||||||
#GNTP/<version> <messagetype> <encryptionAlgorithmID>[:<ivValue>][ <keyHashAlgorithmID>:<keyHash>.<salt>]
|
|
||||||
GNTP_INFO_LINE = re.compile(
|
|
||||||
r'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)' +
|
|
||||||
r' (?P<encryptionAlgorithmID>[A-Z0-9]+(:(?P<ivValue>[A-F0-9]+))?) ?' +
|
|
||||||
r'((?P<keyHashAlgorithmID>[A-Z0-9]+):(?P<keyHash>[A-F0-9]+).(?P<salt>[A-F0-9]+))?\r\n',
|
|
||||||
re.IGNORECASE
|
|
||||||
)
|
|
||||||
|
|
||||||
GNTP_INFO_LINE_SHORT = re.compile(
|
|
||||||
r'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)',
|
|
||||||
re.IGNORECASE
|
|
||||||
)
|
|
||||||
|
|
||||||
GNTP_HEADER = re.compile(r'([\w-]+):(.+)')
|
|
||||||
|
|
||||||
GNTP_EOL = shim.b('\r\n')
|
|
||||||
GNTP_SEP = shim.b(': ')
|
|
||||||
|
|
||||||
|
|
||||||
class _GNTPBuffer(shim.StringIO):
|
|
||||||
"""GNTP Buffer class"""
|
|
||||||
def writeln(self, value=None):
|
|
||||||
if value:
|
|
||||||
self.write(shim.b(value))
|
|
||||||
self.write(GNTP_EOL)
|
|
||||||
|
|
||||||
def writeheader(self, key, value):
|
|
||||||
if not isinstance(value, str):
|
|
||||||
value = str(value)
|
|
||||||
self.write(shim.b(key))
|
|
||||||
self.write(GNTP_SEP)
|
|
||||||
self.write(shim.b(value))
|
|
||||||
self.write(GNTP_EOL)
|
|
||||||
|
|
||||||
|
|
||||||
class _GNTPBase(object):
|
|
||||||
"""Base initilization
|
|
||||||
|
|
||||||
:param string messagetype: GNTP Message type
|
|
||||||
:param string version: GNTP Protocol version
|
|
||||||
:param string encription: Encryption protocol
|
|
||||||
"""
|
|
||||||
def __init__(self, messagetype=None, version='1.0', encryption=None):
|
|
||||||
self.info = {
|
|
||||||
'version': version,
|
|
||||||
'messagetype': messagetype,
|
|
||||||
'encryptionAlgorithmID': encryption
|
|
||||||
}
|
|
||||||
self.hash_algo = {
|
|
||||||
'MD5': hashlib.md5,
|
|
||||||
'SHA1': hashlib.sha1,
|
|
||||||
'SHA256': hashlib.sha256,
|
|
||||||
'SHA512': hashlib.sha512,
|
|
||||||
}
|
|
||||||
self.headers = {}
|
|
||||||
self.resources = {}
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.encode()
|
|
||||||
|
|
||||||
def _parse_info(self, data):
|
|
||||||
"""Parse the first line of a GNTP message to get security and other info values
|
|
||||||
|
|
||||||
:param string data: GNTP Message
|
|
||||||
:return dict: Parsed GNTP Info line
|
|
||||||
"""
|
|
||||||
|
|
||||||
match = GNTP_INFO_LINE.match(data)
|
|
||||||
|
|
||||||
if not match:
|
|
||||||
raise errors.ParseError('ERROR_PARSING_INFO_LINE')
|
|
||||||
|
|
||||||
info = match.groupdict()
|
|
||||||
if info['encryptionAlgorithmID'] == 'NONE':
|
|
||||||
info['encryptionAlgorithmID'] = None
|
|
||||||
|
|
||||||
return info
|
|
||||||
|
|
||||||
def set_password(self, password, encryptAlgo='MD5'):
|
|
||||||
"""Set a password for a GNTP Message
|
|
||||||
|
|
||||||
:param string password: Null to clear password
|
|
||||||
:param string encryptAlgo: Supports MD5, SHA1, SHA256, SHA512
|
|
||||||
"""
|
|
||||||
if not password:
|
|
||||||
self.info['encryptionAlgorithmID'] = None
|
|
||||||
self.info['keyHashAlgorithm'] = None
|
|
||||||
return
|
|
||||||
|
|
||||||
self.password = shim.b(password)
|
|
||||||
self.encryptAlgo = encryptAlgo.upper()
|
|
||||||
|
|
||||||
if not self.encryptAlgo in self.hash_algo:
|
|
||||||
raise errors.UnsupportedError('INVALID HASH "%s"' % self.encryptAlgo)
|
|
||||||
|
|
||||||
hashfunction = self.hash_algo.get(self.encryptAlgo)
|
|
||||||
|
|
||||||
password = password.encode('utf8')
|
|
||||||
seed = time.ctime().encode('utf8')
|
|
||||||
salt = hashfunction(seed).hexdigest()
|
|
||||||
saltHash = hashfunction(seed).digest()
|
|
||||||
keyBasis = password + saltHash
|
|
||||||
key = hashfunction(keyBasis).digest()
|
|
||||||
keyHash = hashfunction(key).hexdigest()
|
|
||||||
|
|
||||||
self.info['keyHashAlgorithmID'] = self.encryptAlgo
|
|
||||||
self.info['keyHash'] = keyHash.upper()
|
|
||||||
self.info['salt'] = salt.upper()
|
|
||||||
|
|
||||||
def _decode_hex(self, value):
|
|
||||||
"""Helper function to decode hex string to `proper` hex string
|
|
||||||
|
|
||||||
:param string value: Human readable hex string
|
|
||||||
:return string: Hex string
|
|
||||||
"""
|
|
||||||
result = ''
|
|
||||||
for i in range(0, len(value), 2):
|
|
||||||
tmp = int(value[i:i + 2], 16)
|
|
||||||
result += chr(tmp)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _decode_binary(self, rawIdentifier, identifier):
|
|
||||||
rawIdentifier += '\r\n\r\n'
|
|
||||||
dataLength = int(identifier['Length'])
|
|
||||||
pointerStart = self.raw.find(rawIdentifier) + len(rawIdentifier)
|
|
||||||
pointerEnd = pointerStart + dataLength
|
|
||||||
data = self.raw[pointerStart:pointerEnd]
|
|
||||||
if not len(data) == dataLength:
|
|
||||||
raise errors.ParseError('INVALID_DATA_LENGTH Expected: %s Recieved %s' % (dataLength, len(data)))
|
|
||||||
return data
|
|
||||||
|
|
||||||
def _validate_password(self, password):
|
|
||||||
"""Validate GNTP Message against stored password"""
|
|
||||||
self.password = password
|
|
||||||
if password is None:
|
|
||||||
raise errors.AuthError('Missing password')
|
|
||||||
keyHash = self.info.get('keyHash', None)
|
|
||||||
if keyHash is None and self.password is None:
|
|
||||||
return True
|
|
||||||
if keyHash is None:
|
|
||||||
raise errors.AuthError('Invalid keyHash')
|
|
||||||
if self.password is None:
|
|
||||||
raise errors.AuthError('Missing password')
|
|
||||||
|
|
||||||
keyHashAlgorithmID = self.info.get('keyHashAlgorithmID','MD5')
|
|
||||||
|
|
||||||
password = self.password.encode('utf8')
|
|
||||||
saltHash = self._decode_hex(self.info['salt'])
|
|
||||||
|
|
||||||
keyBasis = password + saltHash
|
|
||||||
self.key = self.hash_algo[keyHashAlgorithmID](keyBasis).digest()
|
|
||||||
keyHash = self.hash_algo[keyHashAlgorithmID](self.key).hexdigest()
|
|
||||||
|
|
||||||
if not keyHash.upper() == self.info['keyHash'].upper():
|
|
||||||
raise errors.AuthError('Invalid Hash')
|
|
||||||
return True
|
|
||||||
|
|
||||||
def validate(self):
|
|
||||||
"""Verify required headers"""
|
|
||||||
for header in self._requiredHeaders:
|
|
||||||
if not self.headers.get(header, False):
|
|
||||||
raise errors.ParseError('Missing Notification Header: ' + header)
|
|
||||||
|
|
||||||
def _format_info(self):
|
|
||||||
"""Generate info line for GNTP Message
|
|
||||||
|
|
||||||
:return string:
|
|
||||||
"""
|
|
||||||
info = 'GNTP/%s %s' % (
|
|
||||||
self.info.get('version'),
|
|
||||||
self.info.get('messagetype'),
|
|
||||||
)
|
|
||||||
if self.info.get('encryptionAlgorithmID', None):
|
|
||||||
info += ' %s:%s' % (
|
|
||||||
self.info.get('encryptionAlgorithmID'),
|
|
||||||
self.info.get('ivValue'),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
info += ' NONE'
|
|
||||||
|
|
||||||
if self.info.get('keyHashAlgorithmID', None):
|
|
||||||
info += ' %s:%s.%s' % (
|
|
||||||
self.info.get('keyHashAlgorithmID'),
|
|
||||||
self.info.get('keyHash'),
|
|
||||||
self.info.get('salt')
|
|
||||||
)
|
|
||||||
|
|
||||||
return info
|
|
||||||
|
|
||||||
def _parse_dict(self, data):
|
|
||||||
"""Helper function to parse blocks of GNTP headers into a dictionary
|
|
||||||
|
|
||||||
:param string data:
|
|
||||||
:return dict: Dictionary of parsed GNTP Headers
|
|
||||||
"""
|
|
||||||
d = {}
|
|
||||||
for line in data.split('\r\n'):
|
|
||||||
match = GNTP_HEADER.match(line)
|
|
||||||
if not match:
|
|
||||||
continue
|
|
||||||
|
|
||||||
key = match.group(1).strip()
|
|
||||||
val = match.group(2).strip()
|
|
||||||
d[key] = val
|
|
||||||
return d
|
|
||||||
|
|
||||||
def add_header(self, key, value):
|
|
||||||
self.headers[key] = value
|
|
||||||
|
|
||||||
def add_resource(self, data):
|
|
||||||
"""Add binary resource
|
|
||||||
|
|
||||||
:param string data: Binary Data
|
|
||||||
"""
|
|
||||||
data = shim.b(data)
|
|
||||||
identifier = hashlib.md5(data).hexdigest()
|
|
||||||
self.resources[identifier] = data
|
|
||||||
return 'x-growl-resource://%s' % identifier
|
|
||||||
|
|
||||||
def decode(self, data, password=None):
|
|
||||||
"""Decode GNTP Message
|
|
||||||
|
|
||||||
:param string data:
|
|
||||||
"""
|
|
||||||
self.password = password
|
|
||||||
self.raw = shim.u(data)
|
|
||||||
parts = self.raw.split('\r\n\r\n')
|
|
||||||
self.info = self._parse_info(self.raw)
|
|
||||||
self.headers = self._parse_dict(parts[0])
|
|
||||||
|
|
||||||
def encode(self):
|
|
||||||
"""Encode a generic GNTP Message
|
|
||||||
|
|
||||||
:return string: GNTP Message ready to be sent. Returned as a byte string
|
|
||||||
"""
|
|
||||||
|
|
||||||
buff = _GNTPBuffer()
|
|
||||||
|
|
||||||
buff.writeln(self._format_info())
|
|
||||||
|
|
||||||
#Headers
|
|
||||||
for k, v in self.headers.items():
|
|
||||||
buff.writeheader(k, v)
|
|
||||||
buff.writeln()
|
|
||||||
|
|
||||||
#Resources
|
|
||||||
for resource, data in self.resources.items():
|
|
||||||
buff.writeheader('Identifier', resource)
|
|
||||||
buff.writeheader('Length', len(data))
|
|
||||||
buff.writeln()
|
|
||||||
buff.write(data)
|
|
||||||
buff.writeln()
|
|
||||||
buff.writeln()
|
|
||||||
|
|
||||||
return buff.getvalue()
|
|
||||||
|
|
||||||
|
|
||||||
class GNTPRegister(_GNTPBase):
|
|
||||||
"""Represents a GNTP Registration Command
|
|
||||||
|
|
||||||
:param string data: (Optional) See decode()
|
|
||||||
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
||||||
"""
|
|
||||||
_requiredHeaders = [
|
|
||||||
'Application-Name',
|
|
||||||
'Notifications-Count'
|
|
||||||
]
|
|
||||||
_requiredNotificationHeaders = ['Notification-Name']
|
|
||||||
|
|
||||||
def __init__(self, data=None, password=None):
|
|
||||||
_GNTPBase.__init__(self, 'REGISTER')
|
|
||||||
self.notifications = []
|
|
||||||
|
|
||||||
if data:
|
|
||||||
self.decode(data, password)
|
|
||||||
else:
|
|
||||||
self.set_password(password)
|
|
||||||
self.add_header('Application-Name', 'pygntp')
|
|
||||||
self.add_header('Notifications-Count', 0)
|
|
||||||
|
|
||||||
def validate(self):
|
|
||||||
'''Validate required headers and validate notification headers'''
|
|
||||||
for header in self._requiredHeaders:
|
|
||||||
if not self.headers.get(header, False):
|
|
||||||
raise errors.ParseError('Missing Registration Header: ' + header)
|
|
||||||
for notice in self.notifications:
|
|
||||||
for header in self._requiredNotificationHeaders:
|
|
||||||
if not notice.get(header, False):
|
|
||||||
raise errors.ParseError('Missing Notification Header: ' + header)
|
|
||||||
|
|
||||||
def decode(self, data, password):
|
|
||||||
"""Decode existing GNTP Registration message
|
|
||||||
|
|
||||||
:param string data: Message to decode
|
|
||||||
"""
|
|
||||||
self.raw = shim.u(data)
|
|
||||||
parts = self.raw.split('\r\n\r\n')
|
|
||||||
self.info = self._parse_info(self.raw)
|
|
||||||
self._validate_password(password)
|
|
||||||
self.headers = self._parse_dict(parts[0])
|
|
||||||
|
|
||||||
for i, part in enumerate(parts):
|
|
||||||
if i == 0:
|
|
||||||
continue # Skip Header
|
|
||||||
if part.strip() == '':
|
|
||||||
continue
|
|
||||||
notice = self._parse_dict(part)
|
|
||||||
if notice.get('Notification-Name', False):
|
|
||||||
self.notifications.append(notice)
|
|
||||||
elif notice.get('Identifier', False):
|
|
||||||
notice['Data'] = self._decode_binary(part, notice)
|
|
||||||
#open('register.png','wblol').write(notice['Data'])
|
|
||||||
self.resources[notice.get('Identifier')] = notice
|
|
||||||
|
|
||||||
def add_notification(self, name, enabled=True):
|
|
||||||
"""Add new Notification to Registration message
|
|
||||||
|
|
||||||
:param string name: Notification Name
|
|
||||||
:param boolean enabled: Enable this notification by default
|
|
||||||
"""
|
|
||||||
notice = {}
|
|
||||||
notice['Notification-Name'] = name
|
|
||||||
notice['Notification-Enabled'] = enabled
|
|
||||||
|
|
||||||
self.notifications.append(notice)
|
|
||||||
self.add_header('Notifications-Count', len(self.notifications))
|
|
||||||
|
|
||||||
def encode(self):
|
|
||||||
"""Encode a GNTP Registration Message
|
|
||||||
|
|
||||||
:return string: Encoded GNTP Registration message. Returned as a byte string
|
|
||||||
"""
|
|
||||||
|
|
||||||
buff = _GNTPBuffer()
|
|
||||||
|
|
||||||
buff.writeln(self._format_info())
|
|
||||||
|
|
||||||
#Headers
|
|
||||||
for k, v in self.headers.items():
|
|
||||||
buff.writeheader(k, v)
|
|
||||||
buff.writeln()
|
|
||||||
|
|
||||||
#Notifications
|
|
||||||
if len(self.notifications) > 0:
|
|
||||||
for notice in self.notifications:
|
|
||||||
for k, v in notice.items():
|
|
||||||
buff.writeheader(k, v)
|
|
||||||
buff.writeln()
|
|
||||||
|
|
||||||
#Resources
|
|
||||||
for resource, data in self.resources.items():
|
|
||||||
buff.writeheader('Identifier', resource)
|
|
||||||
buff.writeheader('Length', len(data))
|
|
||||||
buff.writeln()
|
|
||||||
buff.write(data)
|
|
||||||
buff.writeln()
|
|
||||||
buff.writeln()
|
|
||||||
|
|
||||||
return buff.getvalue()
|
|
||||||
|
|
||||||
|
|
||||||
class GNTPNotice(_GNTPBase):
|
|
||||||
"""Represents a GNTP Notification Command
|
|
||||||
|
|
||||||
:param string data: (Optional) See decode()
|
|
||||||
:param string app: (Optional) Set Application-Name
|
|
||||||
:param string name: (Optional) Set Notification-Name
|
|
||||||
:param string title: (Optional) Set Notification Title
|
|
||||||
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
||||||
"""
|
|
||||||
_requiredHeaders = [
|
|
||||||
'Application-Name',
|
|
||||||
'Notification-Name',
|
|
||||||
'Notification-Title'
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(self, data=None, app=None, name=None, title=None, password=None):
|
|
||||||
_GNTPBase.__init__(self, 'NOTIFY')
|
|
||||||
|
|
||||||
if data:
|
|
||||||
self.decode(data, password)
|
|
||||||
else:
|
|
||||||
self.set_password(password)
|
|
||||||
if app:
|
|
||||||
self.add_header('Application-Name', app)
|
|
||||||
if name:
|
|
||||||
self.add_header('Notification-Name', name)
|
|
||||||
if title:
|
|
||||||
self.add_header('Notification-Title', title)
|
|
||||||
|
|
||||||
def decode(self, data, password):
|
|
||||||
"""Decode existing GNTP Notification message
|
|
||||||
|
|
||||||
:param string data: Message to decode.
|
|
||||||
"""
|
|
||||||
self.raw = shim.u(data)
|
|
||||||
parts = self.raw.split('\r\n\r\n')
|
|
||||||
self.info = self._parse_info(self.raw)
|
|
||||||
self._validate_password(password)
|
|
||||||
self.headers = self._parse_dict(parts[0])
|
|
||||||
|
|
||||||
for i, part in enumerate(parts):
|
|
||||||
if i == 0:
|
|
||||||
continue # Skip Header
|
|
||||||
if part.strip() == '':
|
|
||||||
continue
|
|
||||||
notice = self._parse_dict(part)
|
|
||||||
if notice.get('Identifier', False):
|
|
||||||
notice['Data'] = self._decode_binary(part, notice)
|
|
||||||
#open('notice.png','wblol').write(notice['Data'])
|
|
||||||
self.resources[notice.get('Identifier')] = notice
|
|
||||||
|
|
||||||
|
|
||||||
class GNTPSubscribe(_GNTPBase):
|
|
||||||
"""Represents a GNTP Subscribe Command
|
|
||||||
|
|
||||||
:param string data: (Optional) See decode()
|
|
||||||
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
||||||
"""
|
|
||||||
_requiredHeaders = [
|
|
||||||
'Subscriber-ID',
|
|
||||||
'Subscriber-Name',
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(self, data=None, password=None):
|
|
||||||
_GNTPBase.__init__(self, 'SUBSCRIBE')
|
|
||||||
if data:
|
|
||||||
self.decode(data, password)
|
|
||||||
else:
|
|
||||||
self.set_password(password)
|
|
||||||
|
|
||||||
|
|
||||||
class GNTPOK(_GNTPBase):
|
|
||||||
"""Represents a GNTP OK Response
|
|
||||||
|
|
||||||
:param string data: (Optional) See _GNTPResponse.decode()
|
|
||||||
:param string action: (Optional) Set type of action the OK Response is for
|
|
||||||
"""
|
|
||||||
_requiredHeaders = ['Response-Action']
|
|
||||||
|
|
||||||
def __init__(self, data=None, action=None):
|
|
||||||
_GNTPBase.__init__(self, '-OK')
|
|
||||||
if data:
|
|
||||||
self.decode(data)
|
|
||||||
if action:
|
|
||||||
self.add_header('Response-Action', action)
|
|
||||||
|
|
||||||
|
|
||||||
class GNTPError(_GNTPBase):
|
|
||||||
"""Represents a GNTP Error response
|
|
||||||
|
|
||||||
:param string data: (Optional) See _GNTPResponse.decode()
|
|
||||||
:param string errorcode: (Optional) Error code
|
|
||||||
:param string errordesc: (Optional) Error Description
|
|
||||||
"""
|
|
||||||
_requiredHeaders = ['Error-Code', 'Error-Description']
|
|
||||||
|
|
||||||
def __init__(self, data=None, errorcode=None, errordesc=None):
|
|
||||||
_GNTPBase.__init__(self, '-ERROR')
|
|
||||||
if data:
|
|
||||||
self.decode(data)
|
|
||||||
if errorcode:
|
|
||||||
self.add_header('Error-Code', errorcode)
|
|
||||||
self.add_header('Error-Description', errordesc)
|
|
||||||
|
|
||||||
def error(self):
|
|
||||||
return (self.headers.get('Error-Code', None),
|
|
||||||
self.headers.get('Error-Description', None))
|
|
||||||
|
|
||||||
|
|
||||||
def parse_gntp(data, password=None):
|
|
||||||
"""Attempt to parse a message as a GNTP message
|
|
||||||
|
|
||||||
:param string data: Message to be parsed
|
|
||||||
:param string password: Optional password to be used to verify the message
|
|
||||||
"""
|
|
||||||
data = shim.u(data)
|
|
||||||
match = GNTP_INFO_LINE_SHORT.match(data)
|
|
||||||
if not match:
|
|
||||||
raise errors.ParseError('INVALID_GNTP_INFO')
|
|
||||||
info = match.groupdict()
|
|
||||||
if info['messagetype'] == 'REGISTER':
|
|
||||||
return GNTPRegister(data, password=password)
|
|
||||||
elif info['messagetype'] == 'NOTIFY':
|
|
||||||
return GNTPNotice(data, password=password)
|
|
||||||
elif info['messagetype'] == 'SUBSCRIBE':
|
|
||||||
return GNTPSubscribe(data, password=password)
|
|
||||||
elif info['messagetype'] == '-OK':
|
|
||||||
return GNTPOK(data)
|
|
||||||
elif info['messagetype'] == '-ERROR':
|
|
||||||
return GNTPError(data)
|
|
||||||
raise errors.ParseError('INVALID_GNTP_MESSAGE')
|
|
@ -1,25 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
class BaseError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ParseError(BaseError):
|
|
||||||
errorcode = 500
|
|
||||||
errordesc = 'Error parsing the message'
|
|
||||||
|
|
||||||
|
|
||||||
class AuthError(BaseError):
|
|
||||||
errorcode = 400
|
|
||||||
errordesc = 'Error with authorization'
|
|
||||||
|
|
||||||
|
|
||||||
class UnsupportedError(BaseError):
|
|
||||||
errorcode = 500
|
|
||||||
errordesc = 'Currently unsupported by gntp.py'
|
|
||||||
|
|
||||||
|
|
||||||
class NetworkError(BaseError):
|
|
||||||
errorcode = 500
|
|
||||||
errordesc = "Error connecting to growl server"
|
|
@ -1,265 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
"""
|
|
||||||
The gntp.notifier module is provided as a simple way to send notifications
|
|
||||||
using GNTP
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
This class is intended to mostly mirror the older Python bindings such
|
|
||||||
that you should be able to replace instances of the old bindings with
|
|
||||||
this class.
|
|
||||||
`Original Python bindings <http://code.google.com/p/growl/source/browse/Bindings/python/Growl.py>`_
|
|
||||||
|
|
||||||
"""
|
|
||||||
import logging
|
|
||||||
import platform
|
|
||||||
import socket
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from .version import __version__
|
|
||||||
from . import core
|
|
||||||
from . import errors as errors
|
|
||||||
from . import shim
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
'mini',
|
|
||||||
'GrowlNotifier',
|
|
||||||
]
|
|
||||||
|
|
||||||
logger = logging.getLogger('gntp')
|
|
||||||
|
|
||||||
|
|
||||||
class GrowlNotifier(object):
|
|
||||||
"""Helper class to simplfy sending Growl messages
|
|
||||||
|
|
||||||
:param string applicationName: Sending application name
|
|
||||||
:param list notification: List of valid notifications
|
|
||||||
:param list defaultNotifications: List of notifications that should be enabled
|
|
||||||
by default
|
|
||||||
:param string applicationIcon: Icon URL
|
|
||||||
:param string hostname: Remote host
|
|
||||||
:param integer port: Remote port
|
|
||||||
"""
|
|
||||||
|
|
||||||
passwordHash = 'MD5'
|
|
||||||
socketTimeout = 3
|
|
||||||
|
|
||||||
def __init__(self, applicationName='Python GNTP', notifications=[],
|
|
||||||
defaultNotifications=None, applicationIcon=None, hostname='localhost',
|
|
||||||
password=None, port=23053):
|
|
||||||
|
|
||||||
self.applicationName = applicationName
|
|
||||||
self.notifications = list(notifications)
|
|
||||||
if defaultNotifications:
|
|
||||||
self.defaultNotifications = list(defaultNotifications)
|
|
||||||
else:
|
|
||||||
self.defaultNotifications = self.notifications
|
|
||||||
self.applicationIcon = applicationIcon
|
|
||||||
|
|
||||||
self.password = password
|
|
||||||
self.hostname = hostname
|
|
||||||
self.port = int(port)
|
|
||||||
|
|
||||||
def _checkIcon(self, data):
|
|
||||||
'''
|
|
||||||
Check the icon to see if it's valid
|
|
||||||
|
|
||||||
If it's a simple URL icon, then we return True. If it's a data icon
|
|
||||||
then we return False
|
|
||||||
'''
|
|
||||||
logger.info('Checking icon')
|
|
||||||
return shim.u(data).startswith('http')
|
|
||||||
|
|
||||||
def register(self):
|
|
||||||
"""Send GNTP Registration
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
Before sending notifications to Growl, you need to have
|
|
||||||
sent a registration message at least once
|
|
||||||
"""
|
|
||||||
logger.info('Sending registration to %s:%s', self.hostname, self.port)
|
|
||||||
register = core.GNTPRegister()
|
|
||||||
register.add_header('Application-Name', self.applicationName)
|
|
||||||
for notification in self.notifications:
|
|
||||||
enabled = notification in self.defaultNotifications
|
|
||||||
register.add_notification(notification, enabled)
|
|
||||||
if self.applicationIcon:
|
|
||||||
if self._checkIcon(self.applicationIcon):
|
|
||||||
register.add_header('Application-Icon', self.applicationIcon)
|
|
||||||
else:
|
|
||||||
resource = register.add_resource(self.applicationIcon)
|
|
||||||
register.add_header('Application-Icon', resource)
|
|
||||||
if self.password:
|
|
||||||
register.set_password(self.password, self.passwordHash)
|
|
||||||
self.add_origin_info(register)
|
|
||||||
self.register_hook(register)
|
|
||||||
return self._send('register', register)
|
|
||||||
|
|
||||||
def notify(self, noteType, title, description, icon=None, sticky=False,
|
|
||||||
priority=None, callback=None, identifier=None, custom={}):
|
|
||||||
"""Send a GNTP notifications
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
Must have registered with growl beforehand or messages will be ignored
|
|
||||||
|
|
||||||
:param string noteType: One of the notification names registered earlier
|
|
||||||
:param string title: Notification title (usually displayed on the notification)
|
|
||||||
:param string description: The main content of the notification
|
|
||||||
:param string icon: Icon URL path
|
|
||||||
:param boolean sticky: Sticky notification
|
|
||||||
:param integer priority: Message priority level from -2 to 2
|
|
||||||
:param string callback: URL callback
|
|
||||||
:param dict custom: Custom attributes. Key names should be prefixed with X-
|
|
||||||
according to the spec but this is not enforced by this class
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
For now, only URL callbacks are supported. In the future, the
|
|
||||||
callback argument will also support a function
|
|
||||||
"""
|
|
||||||
logger.info('Sending notification [%s] to %s:%s', noteType, self.hostname, self.port)
|
|
||||||
assert noteType in self.notifications
|
|
||||||
notice = core.GNTPNotice()
|
|
||||||
notice.add_header('Application-Name', self.applicationName)
|
|
||||||
notice.add_header('Notification-Name', noteType)
|
|
||||||
notice.add_header('Notification-Title', title)
|
|
||||||
if self.password:
|
|
||||||
notice.set_password(self.password, self.passwordHash)
|
|
||||||
if sticky:
|
|
||||||
notice.add_header('Notification-Sticky', sticky)
|
|
||||||
if priority:
|
|
||||||
notice.add_header('Notification-Priority', priority)
|
|
||||||
if icon:
|
|
||||||
if self._checkIcon(icon):
|
|
||||||
notice.add_header('Notification-Icon', icon)
|
|
||||||
else:
|
|
||||||
resource = notice.add_resource(icon)
|
|
||||||
notice.add_header('Notification-Icon', resource)
|
|
||||||
|
|
||||||
if description:
|
|
||||||
notice.add_header('Notification-Text', description)
|
|
||||||
if callback:
|
|
||||||
notice.add_header('Notification-Callback-Target', callback)
|
|
||||||
if identifier:
|
|
||||||
notice.add_header('Notification-Coalescing-ID', identifier)
|
|
||||||
|
|
||||||
for key in custom:
|
|
||||||
notice.add_header(key, custom[key])
|
|
||||||
|
|
||||||
self.add_origin_info(notice)
|
|
||||||
self.notify_hook(notice)
|
|
||||||
|
|
||||||
return self._send('notify', notice)
|
|
||||||
|
|
||||||
def subscribe(self, id, name, port):
|
|
||||||
"""Send a Subscribe request to a remote machine"""
|
|
||||||
sub = core.GNTPSubscribe()
|
|
||||||
sub.add_header('Subscriber-ID', id)
|
|
||||||
sub.add_header('Subscriber-Name', name)
|
|
||||||
sub.add_header('Subscriber-Port', port)
|
|
||||||
if self.password:
|
|
||||||
sub.set_password(self.password, self.passwordHash)
|
|
||||||
|
|
||||||
self.add_origin_info(sub)
|
|
||||||
self.subscribe_hook(sub)
|
|
||||||
|
|
||||||
return self._send('subscribe', sub)
|
|
||||||
|
|
||||||
def add_origin_info(self, packet):
|
|
||||||
"""Add optional Origin headers to message"""
|
|
||||||
packet.add_header('Origin-Machine-Name', platform.node())
|
|
||||||
packet.add_header('Origin-Software-Name', 'gntp.py')
|
|
||||||
packet.add_header('Origin-Software-Version', __version__)
|
|
||||||
packet.add_header('Origin-Platform-Name', platform.system())
|
|
||||||
packet.add_header('Origin-Platform-Version', platform.platform())
|
|
||||||
|
|
||||||
def register_hook(self, packet):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def notify_hook(self, packet):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def subscribe_hook(self, packet):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _send(self, messagetype, packet):
|
|
||||||
"""Send the GNTP Packet"""
|
|
||||||
|
|
||||||
packet.validate()
|
|
||||||
data = packet.encode()
|
|
||||||
|
|
||||||
logger.debug('To : %s:%s <%s>\n%s', self.hostname, self.port, packet.__class__, data)
|
|
||||||
|
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
s.settimeout(self.socketTimeout)
|
|
||||||
try:
|
|
||||||
s.connect((self.hostname, self.port))
|
|
||||||
s.send(data)
|
|
||||||
recv_data = s.recv(1024)
|
|
||||||
while not recv_data.endswith(shim.b("\r\n\r\n")):
|
|
||||||
recv_data += s.recv(1024)
|
|
||||||
except socket.error:
|
|
||||||
# Python2.5 and Python3 compatibile exception
|
|
||||||
exc = sys.exc_info()[1]
|
|
||||||
raise errors.NetworkError(exc)
|
|
||||||
|
|
||||||
response = core.parse_gntp(recv_data)
|
|
||||||
s.close()
|
|
||||||
|
|
||||||
logger.debug('From : %s:%s <%s>\n%s', self.hostname, self.port, response.__class__, response)
|
|
||||||
|
|
||||||
if type(response) == core.GNTPOK:
|
|
||||||
return True
|
|
||||||
logger.error('Invalid response: %s', response.error())
|
|
||||||
return response.error()
|
|
||||||
|
|
||||||
|
|
||||||
def mini(description, applicationName='PythonMini', noteType="Message",
|
|
||||||
title="Mini Message", applicationIcon=None, hostname='localhost',
|
|
||||||
password=None, port=23053, sticky=False, priority=None,
|
|
||||||
callback=None, notificationIcon=None, identifier=None,
|
|
||||||
notifierFactory=GrowlNotifier):
|
|
||||||
"""Single notification function
|
|
||||||
|
|
||||||
Simple notification function in one line. Has only one required parameter
|
|
||||||
and attempts to use reasonable defaults for everything else
|
|
||||||
:param string description: Notification message
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
For now, only URL callbacks are supported. In the future, the
|
|
||||||
callback argument will also support a function
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
growl = notifierFactory(
|
|
||||||
applicationName=applicationName,
|
|
||||||
notifications=[noteType],
|
|
||||||
defaultNotifications=[noteType],
|
|
||||||
applicationIcon=applicationIcon,
|
|
||||||
hostname=hostname,
|
|
||||||
password=password,
|
|
||||||
port=port,
|
|
||||||
)
|
|
||||||
result = growl.register()
|
|
||||||
if result is not True:
|
|
||||||
return result
|
|
||||||
|
|
||||||
return growl.notify(
|
|
||||||
noteType=noteType,
|
|
||||||
title=title,
|
|
||||||
description=description,
|
|
||||||
icon=notificationIcon,
|
|
||||||
sticky=sticky,
|
|
||||||
priority=priority,
|
|
||||||
callback=callback,
|
|
||||||
identifier=identifier,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
# We want the "mini" function to be simple and swallow Exceptions
|
|
||||||
# in order to be less invasive
|
|
||||||
logger.exception("Growl error")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# If we're running this module directly we're likely running it as a test
|
|
||||||
# so extra debugging is useful
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
mini('Testing mini notification')
|
|
@ -1,45 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
"""
|
|
||||||
Python2.5 and Python3.3 compatibility shim
|
|
||||||
|
|
||||||
Heavily inspirted by the "six" library.
|
|
||||||
https://pypi.python.org/pypi/six
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
|
|
||||||
PY3 = sys.version_info[0] == 3
|
|
||||||
|
|
||||||
if PY3:
|
|
||||||
def b(s):
|
|
||||||
if isinstance(s, bytes):
|
|
||||||
return s
|
|
||||||
return s.encode('utf8', 'replace')
|
|
||||||
|
|
||||||
def u(s):
|
|
||||||
if isinstance(s, bytes):
|
|
||||||
return s.decode('utf8', 'replace')
|
|
||||||
return s
|
|
||||||
|
|
||||||
from io import BytesIO as StringIO
|
|
||||||
from configparser import RawConfigParser
|
|
||||||
else:
|
|
||||||
def b(s):
|
|
||||||
if isinstance(s, unicode): # noqa
|
|
||||||
return s.encode('utf8', 'replace')
|
|
||||||
return s
|
|
||||||
|
|
||||||
def u(s):
|
|
||||||
if isinstance(s, unicode): # noqa
|
|
||||||
return s
|
|
||||||
if isinstance(s, int):
|
|
||||||
s = str(s)
|
|
||||||
return unicode(s, "utf8", "replace") # noqa
|
|
||||||
|
|
||||||
from StringIO import StringIO
|
|
||||||
from ConfigParser import RawConfigParser
|
|
||||||
|
|
||||||
b.__doc__ = "Ensure we have a byte string"
|
|
||||||
u.__doc__ = "Ensure we have a unicode string"
|
|
@ -1,4 +0,0 @@
|
|||||||
# Copyright: 2013 Paul Traylor
|
|
||||||
# These sources are released under the terms of the MIT license: see LICENSE
|
|
||||||
|
|
||||||
__version__ = '1.0.2'
|
|
Loading…
Reference in new issue