You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
7.3 KiB
239 lines
7.3 KiB
import typing
|
|
from logging import NullHandler, getLogger
|
|
|
|
logger = getLogger(__name__)
|
|
logger.addHandler(NullHandler())
|
|
|
|
T = typing.TypeVar('T')
|
|
|
|
_visible_chars_table = dict.fromkeys(range(32))
|
|
|
|
|
|
def _is_unknown(value: typing.Any) -> bool:
|
|
return isinstance(value, str) and (not value or value.lower() == 'unknown')
|
|
|
|
|
|
class Reportable(typing.Generic[T]):
|
|
"""Reportable abstract class."""
|
|
|
|
def __init__(
|
|
self,
|
|
*args: str,
|
|
description: typing.Optional[str] = None,
|
|
reportable: bool = True,
|
|
):
|
|
"""Initialize the object."""
|
|
self.names = args
|
|
self._description = description
|
|
self.reportable = reportable
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
"""Rule description."""
|
|
return self._description or '|'.join(self.names)
|
|
|
|
def report(self, value: typing.Union[str, T], context: typing.MutableMapping) -> None:
|
|
"""Report unknown value."""
|
|
if not value or not self.reportable:
|
|
return
|
|
|
|
if 'report' in context:
|
|
report_map = context['report'].setdefault(self.description, {})
|
|
if value not in report_map:
|
|
report_map[value] = context['path']
|
|
logger.info('Invalid %s: %r', self.description, value)
|
|
|
|
|
|
class Property(Reportable[T]):
|
|
"""Property class."""
|
|
|
|
def __init__(
|
|
self,
|
|
*args: str,
|
|
default: typing.Optional[T] = None,
|
|
private: bool = False,
|
|
description: typing.Optional[str] = None,
|
|
delimiter: str = ' / ',
|
|
**kwargs,
|
|
):
|
|
"""Init method."""
|
|
super().__init__(*args, description=description, **kwargs)
|
|
self.default = default
|
|
self.private = private
|
|
# Used to detect duplicated values. e.g.: en / en or High@L4.0 / High@L4.0 or Progressive / Progressive
|
|
self.delimiter = delimiter
|
|
|
|
@classmethod
|
|
def _extract_value(cls,
|
|
track: typing.Mapping,
|
|
name: str,
|
|
names: typing.List[str]):
|
|
if len(names) == 2:
|
|
parent_value = track.get(names[0], track.get(names[0].upper(), {}))
|
|
return parent_value.get(names[1], parent_value.get(names[1].upper()))
|
|
|
|
return track.get(name, track.get(name.upper()))
|
|
|
|
def extract_value(
|
|
self,
|
|
track: typing.Mapping,
|
|
context: typing.MutableMapping,
|
|
) -> typing.Optional[T]:
|
|
"""Extract the property value from a given track."""
|
|
for name in self.names:
|
|
names = name.split('.')
|
|
value = self._extract_value(track, name, names)
|
|
if value is None:
|
|
if self.default is None:
|
|
continue
|
|
|
|
value = self.default
|
|
|
|
if isinstance(value, bytes):
|
|
value = value.decode()
|
|
|
|
if isinstance(value, str):
|
|
value = value.translate(_visible_chars_table).strip()
|
|
if _is_unknown(value):
|
|
continue
|
|
value = self._deduplicate(value)
|
|
|
|
result = self.handle(value, context)
|
|
if result is not None and not _is_unknown(result):
|
|
return result
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def _deduplicate(cls, value: str) -> str:
|
|
values = value.split(' / ')
|
|
if len(values) == 2 and values[0] == values[1]:
|
|
return values[0]
|
|
return value
|
|
|
|
def handle(self, value: T, context: typing.MutableMapping) -> typing.Optional[T]:
|
|
"""Return the value without any modification."""
|
|
return value
|
|
|
|
|
|
class Configurable(Property[T]):
|
|
"""Configurable property where values are in a config mapping."""
|
|
|
|
def __init__(self, config: typing.Mapping[str, typing.Mapping], *args: str,
|
|
config_key: typing.Optional[str] = None, **kwargs):
|
|
"""Init method."""
|
|
super().__init__(*args, **kwargs)
|
|
self.mapping = getattr(config, config_key or self.__class__.__name__) if config else {}
|
|
|
|
@classmethod
|
|
def _extract_key(cls, value: str) -> typing.Union[str, bool]:
|
|
return value.upper()
|
|
|
|
@classmethod
|
|
def _extract_fallback_key(cls, value: str, key: str) -> typing.Optional[T]:
|
|
return None
|
|
|
|
def _lookup(
|
|
self,
|
|
key: str,
|
|
context: typing.MutableMapping,
|
|
) -> typing.Union[T, None, bool]:
|
|
result = self.mapping.get(key)
|
|
if result is not None:
|
|
result = getattr(result, context.get('profile') or 'default')
|
|
return result if result != '__ignored__' else False
|
|
return None
|
|
|
|
def handle(self, value, context):
|
|
"""Return Variable or Constant."""
|
|
key = self._extract_key(value)
|
|
if key is False:
|
|
return
|
|
|
|
result = self._lookup(key, context)
|
|
if result is False:
|
|
return
|
|
|
|
while not result and key:
|
|
key = self._extract_fallback_key(value, key)
|
|
result = self._lookup(key, context)
|
|
if result is False:
|
|
return
|
|
|
|
if not result:
|
|
self.report(value, context)
|
|
|
|
return result
|
|
|
|
|
|
class MultiValue(Property):
|
|
"""Property with multiple values."""
|
|
|
|
def __init__(self, prop: typing.Optional[Property] = None, delimiter='/', single=False,
|
|
handler=None, name=None, **kwargs):
|
|
"""Init method."""
|
|
super().__init__(*(prop.names if prop else (name,)), **kwargs)
|
|
self.prop = prop
|
|
self.delimiter = delimiter
|
|
self.single = single
|
|
self.handler = handler
|
|
|
|
def handle(
|
|
self,
|
|
value: str,
|
|
context: typing.MutableMapping,
|
|
) -> typing.Union[T, typing.List[T]]:
|
|
"""Handle properties with multiple values."""
|
|
if self.handler:
|
|
call = self.handler
|
|
elif self.prop:
|
|
call = self.prop.handle
|
|
else:
|
|
raise NotImplementedError('No handler available')
|
|
|
|
result = call(value, context)
|
|
if result is not None:
|
|
return result
|
|
|
|
if isinstance(value, list):
|
|
if len(value) == 1:
|
|
values = self._split(value[0], self.delimiter)
|
|
else:
|
|
values = value
|
|
else:
|
|
values = self._split(value, self.delimiter)
|
|
|
|
if values is None:
|
|
return call(values, context)
|
|
if len(values) > 1 and not self.single:
|
|
results = [call(item, context) if not _is_unknown(item) else None for item in values]
|
|
results = [r for r in results if r is not None]
|
|
if results:
|
|
return results
|
|
return call(values[0], context)
|
|
|
|
@classmethod
|
|
def _split(
|
|
cls,
|
|
value: typing.Optional[T],
|
|
delimiter: str = '/',
|
|
) -> typing.Optional[typing.List[str]]:
|
|
if value is None:
|
|
return None
|
|
|
|
return [x.strip() for x in str(value).split(delimiter)]
|
|
|
|
|
|
class Rule(Reportable[T]):
|
|
"""Rule abstract class."""
|
|
|
|
def __init__(self, name: str, private=False, override=False, **kwargs):
|
|
"""Initialize the object."""
|
|
super().__init__(name, **kwargs)
|
|
self.private = private
|
|
self.override = override
|
|
|
|
def execute(self, props, pv_props, context: typing.Mapping):
|
|
"""How to execute a rule."""
|
|
raise NotImplementedError
|