|
|
|
"""This module contains several handy functions primarily meant for internal use."""
|
|
|
|
|
|
|
|
from __future__ import division
|
|
|
|
|
|
|
|
from asyncio import iscoroutinefunction
|
|
|
|
from datetime import date, datetime, time, timedelta, tzinfo
|
|
|
|
from calendar import timegm
|
|
|
|
from functools import partial
|
|
|
|
from inspect import isbuiltin, isclass, isfunction, ismethod
|
|
|
|
import re
|
|
|
|
import sys
|
|
|
|
|
|
|
|
from pytz import timezone, utc, FixedOffset
|
|
|
|
import six
|
|
|
|
|
|
|
|
try:
|
|
|
|
from inspect import signature
|
|
|
|
except ImportError: # pragma: nocover
|
|
|
|
from funcsigs import signature
|
|
|
|
|
|
|
|
try:
|
|
|
|
from threading import TIMEOUT_MAX
|
|
|
|
except ImportError:
|
|
|
|
TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows
|
|
|
|
|
|
|
|
__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp',
|
|
|
|
'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name',
|
|
|
|
'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args',
|
|
|
|
'normalize', 'localize', 'TIMEOUT_MAX')
|
|
|
|
|
|
|
|
|
|
|
|
class _Undefined(object):
|
|
|
|
def __nonzero__(self):
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __bool__(self):
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<undefined>'
|
|
|
|
|
|
|
|
|
|
|
|
undefined = _Undefined() #: a unique object that only signifies that no value is defined
|
|
|
|
|
|
|
|
|
|
|
|
def asint(text):
|
|
|
|
"""
|
|
|
|
Safely converts a string to an integer, returning ``None`` if the string is ``None``.
|
|
|
|
|
|
|
|
:type text: str
|
|
|
|
:rtype: int
|
|
|
|
|
|
|
|
"""
|
|
|
|
if text is not None:
|
|
|
|
return int(text)
|
|
|
|
|
|
|
|
|
|
|
|
def asbool(obj):
|
|
|
|
"""
|
|
|
|
Interprets an object as a boolean value.
|
|
|
|
|
|
|
|
:rtype: bool
|
|
|
|
|
|
|
|
"""
|
|
|
|
if isinstance(obj, str):
|
|
|
|
obj = obj.strip().lower()
|
|
|
|
if obj in ('true', 'yes', 'on', 'y', 't', '1'):
|
|
|
|
return True
|
|
|
|
if obj in ('false', 'no', 'off', 'n', 'f', '0'):
|
|
|
|
return False
|
|
|
|
raise ValueError('Unable to interpret value "%s" as boolean' % obj)
|
|
|
|
return bool(obj)
|
|
|
|
|
|
|
|
|
|
|
|
def astimezone(obj):
|
|
|
|
"""
|
|
|
|
Interprets an object as a timezone.
|
|
|
|
|
|
|
|
:rtype: tzinfo
|
|
|
|
|
|
|
|
"""
|
|
|
|
if isinstance(obj, six.string_types):
|
|
|
|
return timezone(obj)
|
|
|
|
if isinstance(obj, tzinfo):
|
|
|
|
if obj.tzname(None) == 'local':
|
|
|
|
raise ValueError(
|
|
|
|
'Unable to determine the name of the local timezone -- you must explicitly '
|
|
|
|
'specify the name of the local timezone. Please refrain from using timezones like '
|
|
|
|
'EST to prevent problems with daylight saving time. Instead, use a locale based '
|
|
|
|
'timezone name (such as Europe/Helsinki).')
|
|
|
|
return obj
|
|
|
|
if obj is not None:
|
|
|
|
raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__)
|
|
|
|
|
|
|
|
|
|
|
|
_DATE_REGEX = re.compile(
|
|
|
|
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
|
|
|
|
r'(?:[ T](?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
|
|
|
|
r'(?:\.(?P<microsecond>\d{1,6}))?'
|
|
|
|
r'(?P<timezone>Z|[+-]\d\d:\d\d)?)?$')
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_datetime(input, tz, arg_name):
|
|
|
|
"""
|
|
|
|
Converts the given object to a timezone aware datetime object.
|
|
|
|
|
|
|
|
If a timezone aware datetime object is passed, it is returned unmodified.
|
|
|
|
If a native datetime object is passed, it is given the specified timezone.
|
|
|
|
If the input is a string, it is parsed as a datetime with the given timezone.
|
|
|
|
|
|
|
|
Date strings are accepted in three different forms: date only (Y-m-d), date with time
|
|
|
|
(Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). Additionally you can
|
|
|
|
override the time zone by giving a specific offset in the format specified by ISO 8601:
|
|
|
|
Z (UTC), +HH:MM or -HH:MM.
|
|
|
|
|
|
|
|
:param str|datetime input: the datetime or string to convert to a timezone aware datetime
|
|
|
|
:param datetime.tzinfo tz: timezone to interpret ``input`` in
|
|
|
|
:param str arg_name: the name of the argument (used in an error message)
|
|
|
|
:rtype: datetime
|
|
|
|
|
|
|
|
"""
|
|
|
|
if input is None:
|
|
|
|
return
|
|
|
|
elif isinstance(input, datetime):
|
|
|
|
datetime_ = input
|
|
|
|
elif isinstance(input, date):
|
|
|
|
datetime_ = datetime.combine(input, time())
|
|
|
|
elif isinstance(input, six.string_types):
|
|
|
|
m = _DATE_REGEX.match(input)
|
|
|
|
if not m:
|
|
|
|
raise ValueError('Invalid date string')
|
|
|
|
|
|
|
|
values = m.groupdict()
|
|
|
|
tzname = values.pop('timezone')
|
|
|
|
if tzname == 'Z':
|
|
|
|
tz = utc
|
|
|
|
elif tzname:
|
|
|
|
hours, minutes = (int(x) for x in tzname[1:].split(':'))
|
|
|
|
sign = 1 if tzname[0] == '+' else -1
|
|
|
|
tz = FixedOffset(sign * (hours * 60 + minutes))
|
|
|
|
|
|
|
|
values = {k: int(v or 0) for k, v in values.items()}
|
|
|
|
datetime_ = datetime(**values)
|
|
|
|
else:
|
|
|
|
raise TypeError('Unsupported type for %s: %s' % (arg_name, input.__class__.__name__))
|
|
|
|
|
|
|
|
if datetime_.tzinfo is not None:
|
|
|
|
return datetime_
|
|
|
|
if tz is None:
|
|
|
|
raise ValueError(
|
|
|
|
'The "tz" argument must be specified if %s has no timezone information' % arg_name)
|
|
|
|
if isinstance(tz, six.string_types):
|
|
|
|
tz = timezone(tz)
|
|
|
|
|
|
|
|
return localize(datetime_, tz)
|
|
|
|
|
|
|
|
|
|
|
|
def datetime_to_utc_timestamp(timeval):
|
|
|
|
"""
|
|
|
|
Converts a datetime instance to a timestamp.
|
|
|
|
|
|
|
|
:type timeval: datetime
|
|
|
|
:rtype: float
|
|
|
|
|
|
|
|
"""
|
|
|
|
if timeval is not None:
|
|
|
|
return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000
|
|
|
|
|
|
|
|
|
|
|
|
def utc_timestamp_to_datetime(timestamp):
|
|
|
|
"""
|
|
|
|
Converts the given timestamp to a datetime instance.
|
|
|
|
|
|
|
|
:type timestamp: float
|
|
|
|
:rtype: datetime
|
|
|
|
|
|
|
|
"""
|
|
|
|
if timestamp is not None:
|
|
|
|
return datetime.fromtimestamp(timestamp, utc)
|
|
|
|
|
|
|
|
|
|
|
|
def timedelta_seconds(delta):
|
|
|
|
"""
|
|
|
|
Converts the given timedelta to seconds.
|
|
|
|
|
|
|
|
:type delta: timedelta
|
|
|
|
:rtype: float
|
|
|
|
|
|
|
|
"""
|
|
|
|
return delta.days * 24 * 60 * 60 + delta.seconds + \
|
|
|
|
delta.microseconds / 1000000.0
|
|
|
|
|
|
|
|
|
|
|
|
def datetime_ceil(dateval):
|
|
|
|
"""
|
|
|
|
Rounds the given datetime object upwards.
|
|
|
|
|
|
|
|
:type dateval: datetime
|
|
|
|
|
|
|
|
"""
|
|
|
|
if dateval.microsecond > 0:
|
|
|
|
return dateval + timedelta(seconds=1, microseconds=-dateval.microsecond)
|
|
|
|
return dateval
|
|
|
|
|
|
|
|
|
|
|
|
def datetime_repr(dateval):
|
|
|
|
return dateval.strftime('%Y-%m-%d %H:%M:%S %Z') if dateval else 'None'
|
|
|
|
|
|
|
|
|
|
|
|
def get_callable_name(func):
|
|
|
|
"""
|
|
|
|
Returns the best available display name for the given function/callable.
|
|
|
|
|
|
|
|
:rtype: str
|
|
|
|
|
|
|
|
"""
|
|
|
|
if ismethod(func):
|
|
|
|
self = func.__self__
|
|
|
|
cls = self if isclass(self) else type(self)
|
|
|
|
return f"{cls.__qualname__}.{func.__name__}"
|
|
|
|
elif isclass(func) or isfunction(func) or isbuiltin(func):
|
|
|
|
return func.__qualname__
|
|
|
|
elif hasattr(func, '__call__') and callable(func.__call__):
|
|
|
|
# instance of a class with a __call__ method
|
|
|
|
return type(func).__qualname__
|
|
|
|
|
|
|
|
raise TypeError('Unable to determine a name for %r -- maybe it is not a callable?' % func)
|
|
|
|
|
|
|
|
|
|
|
|
def obj_to_ref(obj):
|
|
|
|
"""
|
|
|
|
Returns the path to the given callable.
|
|
|
|
|
|
|
|
:rtype: str
|
|
|
|
:raises TypeError: if the given object is not callable
|
|
|
|
:raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested
|
|
|
|
function
|
|
|
|
|
|
|
|
"""
|
|
|
|
if isinstance(obj, partial):
|
|
|
|
raise ValueError('Cannot create a reference to a partial()')
|
|
|
|
|
|
|
|
name = get_callable_name(obj)
|
|
|
|
if '<lambda>' in name:
|
|
|
|
raise ValueError('Cannot create a reference to a lambda')
|
|
|
|
if '<locals>' in name:
|
|
|
|
raise ValueError('Cannot create a reference to a nested function')
|
|
|
|
|
|
|
|
if ismethod(obj):
|
|
|
|
module = obj.__self__.__module__
|
|
|
|
else:
|
|
|
|
module = obj.__module__
|
|
|
|
|
|
|
|
return '%s:%s' % (module, name)
|
|
|
|
|
|
|
|
|
|
|
|
def ref_to_obj(ref):
|
|
|
|
"""
|
|
|
|
Returns the object pointed to by ``ref``.
|
|
|
|
|
|
|
|
:type ref: str
|
|
|
|
|
|
|
|
"""
|
|
|
|
if not isinstance(ref, six.string_types):
|
|
|
|
raise TypeError('References must be strings')
|
|
|
|
if ':' not in ref:
|
|
|
|
raise ValueError('Invalid reference')
|
|
|
|
|
|
|
|
modulename, rest = ref.split(':', 1)
|
|
|
|
try:
|
|
|
|
obj = __import__(modulename, fromlist=[rest])
|
|
|
|
except ImportError:
|
|
|
|
raise LookupError('Error resolving reference %s: could not import module' % ref)
|
|
|
|
|
|
|
|
try:
|
|
|
|
for name in rest.split('.'):
|
|
|
|
obj = getattr(obj, name)
|
|
|
|
return obj
|
|
|
|
except Exception:
|
|
|
|
raise LookupError('Error resolving reference %s: error looking up object' % ref)
|
|
|
|
|
|
|
|
|
|
|
|
def maybe_ref(ref):
|
|
|
|
"""
|
|
|
|
Returns the object that the given reference points to, if it is indeed a reference.
|
|
|
|
If it is not a reference, the object is returned as-is.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if not isinstance(ref, str):
|
|
|
|
return ref
|
|
|
|
return ref_to_obj(ref)
|
|
|
|
|
|
|
|
|
|
|
|
if six.PY2:
|
|
|
|
def repr_escape(string):
|
|
|
|
if isinstance(string, six.text_type):
|
|
|
|
return string.encode('ascii', 'backslashreplace')
|
|
|
|
return string
|
|
|
|
else:
|
|
|
|
def repr_escape(string):
|
|
|
|
return string
|
|
|
|
|
|
|
|
|
|
|
|
def check_callable_args(func, args, kwargs):
|
|
|
|
"""
|
|
|
|
Ensures that the given callable can be called with the given arguments.
|
|
|
|
|
|
|
|
:type args: tuple
|
|
|
|
:type kwargs: dict
|
|
|
|
|
|
|
|
"""
|
|
|
|
pos_kwargs_conflicts = [] # parameters that have a match in both args and kwargs
|
|
|
|
positional_only_kwargs = [] # positional-only parameters that have a match in kwargs
|
|
|
|
unsatisfied_args = [] # parameters in signature that don't have a match in args or kwargs
|
|
|
|
unsatisfied_kwargs = [] # keyword-only arguments that don't have a match in kwargs
|
|
|
|
unmatched_args = list(args) # args that didn't match any of the parameters in the signature
|
|
|
|
# kwargs that didn't match any of the parameters in the signature
|
|
|
|
unmatched_kwargs = list(kwargs)
|
|
|
|
# indicates if the signature defines *args and **kwargs respectively
|
|
|
|
has_varargs = has_var_kwargs = False
|
|
|
|
|
|
|
|
try:
|
|
|
|
if sys.version_info >= (3, 5):
|
|
|
|
sig = signature(func, follow_wrapped=False)
|
|
|
|
else:
|
|
|
|
sig = signature(func)
|
|
|
|
except ValueError:
|
|
|
|
# signature() doesn't work against every kind of callable
|
|
|
|
return
|
|
|
|
|
|
|
|
for param in six.itervalues(sig.parameters):
|
|
|
|
if param.kind == param.POSITIONAL_OR_KEYWORD:
|
|
|
|
if param.name in unmatched_kwargs and unmatched_args:
|
|
|
|
pos_kwargs_conflicts.append(param.name)
|
|
|
|
elif unmatched_args:
|
|
|
|
del unmatched_args[0]
|
|
|
|
elif param.name in unmatched_kwargs:
|
|
|
|
unmatched_kwargs.remove(param.name)
|
|
|
|
elif param.default is param.empty:
|
|
|
|
unsatisfied_args.append(param.name)
|
|
|
|
elif param.kind == param.POSITIONAL_ONLY:
|
|
|
|
if unmatched_args:
|
|
|
|
del unmatched_args[0]
|
|
|
|
elif param.name in unmatched_kwargs:
|
|
|
|
unmatched_kwargs.remove(param.name)
|
|
|
|
positional_only_kwargs.append(param.name)
|
|
|
|
elif param.default is param.empty:
|
|
|
|
unsatisfied_args.append(param.name)
|
|
|
|
elif param.kind == param.KEYWORD_ONLY:
|
|
|
|
if param.name in unmatched_kwargs:
|
|
|
|
unmatched_kwargs.remove(param.name)
|
|
|
|
elif param.default is param.empty:
|
|
|
|
unsatisfied_kwargs.append(param.name)
|
|
|
|
elif param.kind == param.VAR_POSITIONAL:
|
|
|
|
has_varargs = True
|
|
|
|
elif param.kind == param.VAR_KEYWORD:
|
|
|
|
has_var_kwargs = True
|
|
|
|
|
|
|
|
# Make sure there are no conflicts between args and kwargs
|
|
|
|
if pos_kwargs_conflicts:
|
|
|
|
raise ValueError('The following arguments are supplied in both args and kwargs: %s' %
|
|
|
|
', '.join(pos_kwargs_conflicts))
|
|
|
|
|
|
|
|
# Check if keyword arguments are being fed to positional-only parameters
|
|
|
|
if positional_only_kwargs:
|
|
|
|
raise ValueError('The following arguments cannot be given as keyword arguments: %s' %
|
|
|
|
', '.join(positional_only_kwargs))
|
|
|
|
|
|
|
|
# Check that the number of positional arguments minus the number of matched kwargs matches the
|
|
|
|
# argspec
|
|
|
|
if unsatisfied_args:
|
|
|
|
raise ValueError('The following arguments have not been supplied: %s' %
|
|
|
|
', '.join(unsatisfied_args))
|
|
|
|
|
|
|
|
# Check that all keyword-only arguments have been supplied
|
|
|
|
if unsatisfied_kwargs:
|
|
|
|
raise ValueError(
|
|
|
|
'The following keyword-only arguments have not been supplied in kwargs: %s' %
|
|
|
|
', '.join(unsatisfied_kwargs))
|
|
|
|
|
|
|
|
# Check that the callable can accept the given number of positional arguments
|
|
|
|
if not has_varargs and unmatched_args:
|
|
|
|
raise ValueError(
|
|
|
|
'The list of positional arguments is longer than the target callable can handle '
|
|
|
|
'(allowed: %d, given in args: %d)' % (len(args) - len(unmatched_args), len(args)))
|
|
|
|
|
|
|
|
# Check that the callable can accept the given keyword arguments
|
|
|
|
if not has_var_kwargs and unmatched_kwargs:
|
|
|
|
raise ValueError(
|
|
|
|
'The target callable does not accept the following keyword arguments: %s' %
|
|
|
|
', '.join(unmatched_kwargs))
|
|
|
|
|
|
|
|
|
|
|
|
def iscoroutinefunction_partial(f):
|
|
|
|
while isinstance(f, partial):
|
|
|
|
f = f.func
|
|
|
|
|
|
|
|
# The asyncio version of iscoroutinefunction includes testing for @coroutine
|
|
|
|
# decorations vs. the inspect version which does not.
|
|
|
|
return iscoroutinefunction(f)
|
|
|
|
|
|
|
|
|
|
|
|
def normalize(dt):
|
|
|
|
return datetime.fromtimestamp(dt.timestamp(), dt.tzinfo)
|
|
|
|
|
|
|
|
|
|
|
|
def localize(dt, tzinfo):
|
|
|
|
if hasattr(tzinfo, 'localize'):
|
|
|
|
return tzinfo.localize(dt)
|
|
|
|
|
|
|
|
return normalize(dt.replace(tzinfo=tzinfo))
|