"""This module contains several handy functions primarily meant for internal use.""" from __future__ import division from asyncio import iscoroutinefunction from datetime import date, datetime, time, timedelta, tzinfo from calendar import timegm from functools import partial from inspect import isbuiltin, isclass, isfunction, ismethod import re import sys from pytz import timezone, utc, FixedOffset import six try: from inspect import signature except ImportError: # pragma: nocover from funcsigs import signature try: from threading import TIMEOUT_MAX except ImportError: TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows __all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp', 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args', 'normalize', 'localize', 'TIMEOUT_MAX') class _Undefined(object): def __nonzero__(self): return False def __bool__(self): return False def __repr__(self): return '<undefined>' undefined = _Undefined() #: a unique object that only signifies that no value is defined def asint(text): """ Safely converts a string to an integer, returning ``None`` if the string is ``None``. :type text: str :rtype: int """ if text is not None: return int(text) def asbool(obj): """ Interprets an object as a boolean value. :rtype: bool """ if isinstance(obj, str): obj = obj.strip().lower() if obj in ('true', 'yes', 'on', 'y', 't', '1'): return True if obj in ('false', 'no', 'off', 'n', 'f', '0'): return False raise ValueError('Unable to interpret value "%s" as boolean' % obj) return bool(obj) def astimezone(obj): """ Interprets an object as a timezone. :rtype: tzinfo """ if isinstance(obj, six.string_types): return timezone(obj) if isinstance(obj, tzinfo): if obj.tzname(None) == 'local': raise ValueError( 'Unable to determine the name of the local timezone -- you must explicitly ' 'specify the name of the local timezone. Please refrain from using timezones like ' 'EST to prevent problems with daylight saving time. Instead, use a locale based ' 'timezone name (such as Europe/Helsinki).') return obj if obj is not None: raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__) _DATE_REGEX = re.compile( r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})' r'(?:[ T](?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})' r'(?:\.(?P<microsecond>\d{1,6}))?' r'(?P<timezone>Z|[+-]\d\d:\d\d)?)?$') def convert_to_datetime(input, tz, arg_name): """ Converts the given object to a timezone aware datetime object. If a timezone aware datetime object is passed, it is returned unmodified. If a native datetime object is passed, it is given the specified timezone. If the input is a string, it is parsed as a datetime with the given timezone. Date strings are accepted in three different forms: date only (Y-m-d), date with time (Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). Additionally you can override the time zone by giving a specific offset in the format specified by ISO 8601: Z (UTC), +HH:MM or -HH:MM. :param str|datetime input: the datetime or string to convert to a timezone aware datetime :param datetime.tzinfo tz: timezone to interpret ``input`` in :param str arg_name: the name of the argument (used in an error message) :rtype: datetime """ if input is None: return elif isinstance(input, datetime): datetime_ = input elif isinstance(input, date): datetime_ = datetime.combine(input, time()) elif isinstance(input, six.string_types): m = _DATE_REGEX.match(input) if not m: raise ValueError('Invalid date string') values = m.groupdict() tzname = values.pop('timezone') if tzname == 'Z': tz = utc elif tzname: hours, minutes = (int(x) for x in tzname[1:].split(':')) sign = 1 if tzname[0] == '+' else -1 tz = FixedOffset(sign * (hours * 60 + minutes)) values = {k: int(v or 0) for k, v in values.items()} datetime_ = datetime(**values) else: raise TypeError('Unsupported type for %s: %s' % (arg_name, input.__class__.__name__)) if datetime_.tzinfo is not None: return datetime_ if tz is None: raise ValueError( 'The "tz" argument must be specified if %s has no timezone information' % arg_name) if isinstance(tz, six.string_types): tz = timezone(tz) return localize(datetime_, tz) def datetime_to_utc_timestamp(timeval): """ Converts a datetime instance to a timestamp. :type timeval: datetime :rtype: float """ if timeval is not None: return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000 def utc_timestamp_to_datetime(timestamp): """ Converts the given timestamp to a datetime instance. :type timestamp: float :rtype: datetime """ if timestamp is not None: return datetime.fromtimestamp(timestamp, utc) def timedelta_seconds(delta): """ Converts the given timedelta to seconds. :type delta: timedelta :rtype: float """ return delta.days * 24 * 60 * 60 + delta.seconds + \ delta.microseconds / 1000000.0 def datetime_ceil(dateval): """ Rounds the given datetime object upwards. :type dateval: datetime """ if dateval.microsecond > 0: return dateval + timedelta(seconds=1, microseconds=-dateval.microsecond) return dateval def datetime_repr(dateval): return dateval.strftime('%Y-%m-%d %H:%M:%S %Z') if dateval else 'None' def get_callable_name(func): """ Returns the best available display name for the given function/callable. :rtype: str """ if ismethod(func): self = func.__self__ cls = self if isclass(self) else type(self) return f"{cls.__qualname__}.{func.__name__}" elif isclass(func) or isfunction(func) or isbuiltin(func): return func.__qualname__ elif hasattr(func, '__call__') and callable(func.__call__): # instance of a class with a __call__ method return type(func).__qualname__ raise TypeError('Unable to determine a name for %r -- maybe it is not a callable?' % func) def obj_to_ref(obj): """ Returns the path to the given callable. :rtype: str :raises TypeError: if the given object is not callable :raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested function """ if isinstance(obj, partial): raise ValueError('Cannot create a reference to a partial()') name = get_callable_name(obj) if '<lambda>' in name: raise ValueError('Cannot create a reference to a lambda') if '<locals>' in name: raise ValueError('Cannot create a reference to a nested function') if ismethod(obj): module = obj.__self__.__module__ else: module = obj.__module__ return '%s:%s' % (module, name) def ref_to_obj(ref): """ Returns the object pointed to by ``ref``. :type ref: str """ if not isinstance(ref, six.string_types): raise TypeError('References must be strings') if ':' not in ref: raise ValueError('Invalid reference') modulename, rest = ref.split(':', 1) try: obj = __import__(modulename, fromlist=[rest]) except ImportError: raise LookupError('Error resolving reference %s: could not import module' % ref) try: for name in rest.split('.'): obj = getattr(obj, name) return obj except Exception: raise LookupError('Error resolving reference %s: error looking up object' % ref) def maybe_ref(ref): """ Returns the object that the given reference points to, if it is indeed a reference. If it is not a reference, the object is returned as-is. """ if not isinstance(ref, str): return ref return ref_to_obj(ref) if six.PY2: def repr_escape(string): if isinstance(string, six.text_type): return string.encode('ascii', 'backslashreplace') return string else: def repr_escape(string): return string def check_callable_args(func, args, kwargs): """ Ensures that the given callable can be called with the given arguments. :type args: tuple :type kwargs: dict """ pos_kwargs_conflicts = [] # parameters that have a match in both args and kwargs positional_only_kwargs = [] # positional-only parameters that have a match in kwargs unsatisfied_args = [] # parameters in signature that don't have a match in args or kwargs unsatisfied_kwargs = [] # keyword-only arguments that don't have a match in kwargs unmatched_args = list(args) # args that didn't match any of the parameters in the signature # kwargs that didn't match any of the parameters in the signature unmatched_kwargs = list(kwargs) # indicates if the signature defines *args and **kwargs respectively has_varargs = has_var_kwargs = False try: if sys.version_info >= (3, 5): sig = signature(func, follow_wrapped=False) else: sig = signature(func) except ValueError: # signature() doesn't work against every kind of callable return for param in six.itervalues(sig.parameters): if param.kind == param.POSITIONAL_OR_KEYWORD: if param.name in unmatched_kwargs and unmatched_args: pos_kwargs_conflicts.append(param.name) elif unmatched_args: del unmatched_args[0] elif param.name in unmatched_kwargs: unmatched_kwargs.remove(param.name) elif param.default is param.empty: unsatisfied_args.append(param.name) elif param.kind == param.POSITIONAL_ONLY: if unmatched_args: del unmatched_args[0] elif param.name in unmatched_kwargs: unmatched_kwargs.remove(param.name) positional_only_kwargs.append(param.name) elif param.default is param.empty: unsatisfied_args.append(param.name) elif param.kind == param.KEYWORD_ONLY: if param.name in unmatched_kwargs: unmatched_kwargs.remove(param.name) elif param.default is param.empty: unsatisfied_kwargs.append(param.name) elif param.kind == param.VAR_POSITIONAL: has_varargs = True elif param.kind == param.VAR_KEYWORD: has_var_kwargs = True # Make sure there are no conflicts between args and kwargs if pos_kwargs_conflicts: raise ValueError('The following arguments are supplied in both args and kwargs: %s' % ', '.join(pos_kwargs_conflicts)) # Check if keyword arguments are being fed to positional-only parameters if positional_only_kwargs: raise ValueError('The following arguments cannot be given as keyword arguments: %s' % ', '.join(positional_only_kwargs)) # Check that the number of positional arguments minus the number of matched kwargs matches the # argspec if unsatisfied_args: raise ValueError('The following arguments have not been supplied: %s' % ', '.join(unsatisfied_args)) # Check that all keyword-only arguments have been supplied if unsatisfied_kwargs: raise ValueError( 'The following keyword-only arguments have not been supplied in kwargs: %s' % ', '.join(unsatisfied_kwargs)) # Check that the callable can accept the given number of positional arguments if not has_varargs and unmatched_args: raise ValueError( 'The list of positional arguments is longer than the target callable can handle ' '(allowed: %d, given in args: %d)' % (len(args) - len(unmatched_args), len(args))) # Check that the callable can accept the given keyword arguments if not has_var_kwargs and unmatched_kwargs: raise ValueError( 'The target callable does not accept the following keyword arguments: %s' % ', '.join(unmatched_kwargs)) def iscoroutinefunction_partial(f): while isinstance(f, partial): f = f.func # The asyncio version of iscoroutinefunction includes testing for @coroutine # decorations vs. the inspect version which does not. return iscoroutinefunction(f) def normalize(dt): return datetime.fromtimestamp(dt.timestamp(), dt.tzinfo) def localize(dt, tzinfo): if hasattr(tzinfo, 'localize'): return tzinfo.localize(dt) return normalize(dt.replace(tzinfo=tzinfo))