# -*- coding: utf-8 -*- from ...exceptions import ReadError from .readers import * from pkg_resources import resource_stream # @UnresolvedImport from xml.dom import minidom import logging __all__ = ['INTEGER', 'UINTEGER', 'FLOAT', 'STRING', 'UNICODE', 'DATE', 'MASTER', 'BINARY', 'SPEC_TYPES', 'READERS', 'Element', 'MasterElement', 'parse', 'parse_element', 'get_matroska_specs'] logger = logging.getLogger(__name__) # EBML types INTEGER, UINTEGER, FLOAT, STRING, UNICODE, DATE, MASTER, BINARY = range(8) # Spec types to EBML types mapping SPEC_TYPES = { 'integer': INTEGER, 'uinteger': UINTEGER, 'float': FLOAT, 'string': STRING, 'utf-8': UNICODE, 'date': DATE, 'master': MASTER, 'binary': BINARY } # Readers to use per EBML type READERS = { INTEGER: read_element_integer, UINTEGER: read_element_uinteger, FLOAT: read_element_float, STRING: read_element_string, UNICODE: read_element_unicode, DATE: read_element_date, BINARY: read_element_binary } class BaseElement(object): def __init__(self, id=None, position=None, size=None, data=None): self.id = id self.position = position self.size = size self.data = data class Element(BaseElement): """Base object of EBML :param int id: id of the element, best represented as hexadecimal (0x18538067 for Matroska Segment element) :param type: type of the element :type type: :data:`INTEGER`, :data:`UINTEGER`, :data:`FLOAT`, :data:`STRING`, :data:`UNICODE`, :data:`DATE`, :data:`MASTER` or :data:`BINARY` :param string name: name of the element :param int level: level of the element :param int position: position of element's data :param int size: size of element's data :param data: data as read by the corresponding :data:`READERS` """ def __init__(self, id=None, type=None, name=None, level=None, position=None, size=None, data=None): super(Element, self).__init__(id, position, size, data) self.type = type self.name = name self.level = level def __repr__(self): return '<%s [%s, %r]>' % (self.__class__.__name__, self.name, self.data) class MasterElement(Element): """Element of type :data:`MASTER` that has a list of :class:`Element` as its data :param int id: id of the element, best represented as hexadecimal (0x18538067 for Matroska Segment element) :param string name: name of the element :param int level: level of the element :param int position: position of element's data :param int size: size of element's data :param data: child elements :type data: list of :class:`Element` :class:`MasterElement` implements some magic methods to ease manipulation. Thus, a MasterElement supports the `in` keyword to test for the presence of a child element by its name and gives access to it with a container getter:: >>> ebml_element = parse(open('test1.mkv', 'rb'), get_matroska_specs())[0] >>> 'EBMLVersion' in ebml_element False >>> 'DocType' in ebml_element True >>> ebml_element['DocType'] Element(DocType, u'matroska') """ def __init__(self, id=None, name=None, level=None, position=None, size=None, data=None): super(MasterElement, self).__init__(id, MASTER, name, level, position, size, data) def load(self, stream, specs, ignore_element_types=None, ignore_element_names=None, max_level=None): """Load children :class:`Elements ` with level lower or equal to the `max_level` from the `stream` according to the `specs` :param stream: file-like object from which to read :param dict specs: see :ref:`specs` :param int max_level: maximum level for children elements :param list ignore_element_types: list of element types to ignore :param list ignore_element_names: list of element names to ignore :param int max_level: maximum level of elements """ self.data = parse(stream, specs, self.size, ignore_element_types, ignore_element_names, max_level) def get(self, name, default=None): """Convenience method for ``master_element[name].data if name in master_element else default`` :param string name: the name of the child to get :param default: default value if `name` is not in the :class:`MasterElement` :return: the data of the child :class:`Element` or `default` """ if name not in self: return default element = self[name] if element.type == MASTER: raise ValueError('%s is a MasterElement' % name) return element.data def __getitem__(self, key): if isinstance(key, int): return self.data[key] children = [e for e in self.data if e.name == key] if not children: raise KeyError(key) if len(children) > 1: raise KeyError('More than 1 child with key %s (%d)' % (key, len(children))) return children[0] def __contains__(self, item): return len([e for e in self.data if e.name == item]) > 0 def __iter__(self): return iter(self.data) def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_names=None, max_level=None, include_element_names=None): """Parse a stream for `size` bytes according to the `specs` :param stream: file-like object from which to read :param size: maximum number of bytes to read, None to read all the stream :type size: int or None :param dict specs: see :ref:`specs` :param list ignore_element_types: list of element types to ignore :param list ignore_element_names: list of element names to ignore :param int max_level: maximum level of elements :param list include_element_names: list of element names to include exclusively, so ignoring all other element names :return: parsed data as a tree of :class:`~enzyme.parsers.ebml.core.Element` :rtype: list .. note:: If `size` is reached in a middle of an element, reading will continue until the element is fully parsed. """ ignore_element_types = ignore_element_types if ignore_element_types is not None else [] ignore_element_names = ignore_element_names if ignore_element_names is not None else [] include_element_names = include_element_names if include_element_names is not None else [] start = stream.tell() elements = [] while size is None or stream.tell() - start < size: try: element = parse_element(stream, specs) if not element or not hasattr(element, "type"): stream.seek(element.size, 1) continue if element.type is None: logger.error('Element with id 0x%x is not in the specs' % element.id) stream.seek(element.size, 1) continue elif element.type in ignore_element_types or element.name in ignore_element_names: logger.info('%s %s %s ignored', element.__class__.__name__, element.name, element.type) stream.seek(element.size, 1) continue elif len(include_element_names) > 0 and element.name not in include_element_names: stream.seek(element.size, 1) continue elif element.type == MASTER: if max_level is not None and element.level >= max_level: logger.info('Maximum level %d reached for children of %s %s', max_level, element.__class__.__name__, element.name) stream.seek(element.size, 1) else: logger.debug('Loading child elements for %s %s with size %d', element.__class__.__name__, element.name, element.size) element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level,include_element_names) else: element.data = READERS[element.type](stream, element.size) elements.append(element) except ReadError: if size is not None: raise break return elements def parse_element(stream, specs): """Extract a single :class:`Element` from the `stream` according to the `specs` :param stream: file-like object from which to read :param dict specs: see :ref:`specs` :return: the parsed element :rtype: :class:`Element` """ element_id = read_element_id(stream) if element_id is None: raise ReadError('Cannot read element id') element_size = read_element_size(stream) if element_size is None: raise ReadError('Cannot read element size') if element_id not in specs: return BaseElement(element_id,stream.tell(),element_size) element_type, element_name, element_level = specs[element_id] if element_type == MASTER: element = MasterElement(element_id, element_name, element_level, stream.tell(), element_size) else: element = Element(element_id, element_type, element_name, element_level, stream.tell(), element_size) return element def get_matroska_specs(webm_only=False): """Get the Matroska specs :param bool webm_only: load *only* WebM specs :return: the specs in the appropriate format. See :ref:`specs` :rtype: dict """ specs = {} with resource_stream(__name__, 'specs/matroska.xml') as resource: xmldoc = minidom.parse(resource) for element in xmldoc.getElementsByTagName('element'): if not webm_only or element.hasAttribute('webm') and element.getAttribute('webm') == '1': specs[int(element.getAttribute('id'), 16)] = (SPEC_TYPES[element.getAttribute('type')], element.getAttribute('name'), int(element.getAttribute('level'))) return specs