1599 lines
62 KiB
Python
1599 lines
62 KiB
Python
"""'''
|
|
EBMLite: A lightweight EBML parsing library. It is designed to crawl through
|
|
EBML files quickly and efficiently, and that's about it.
|
|
|
|
@todo: Complete EBML encoding. Specifically, make 'master' elements write
|
|
directly to the stream, rather than build bytearrays, so huge 'master'
|
|
elements can be handled. It appears that the official spec may prohibit
|
|
(or at least counter-indicate) multiple root elements. Possible
|
|
compromise until proper fix: handle root 'master' elements differently
|
|
than deeper ones, more like the current `Document`.
|
|
@todo: Validation. Enforce the hierarchy defined in each schema.
|
|
@todo: Optimize 'infinite' master elements (i.e `size` is `None`). See notes
|
|
in `MasterElement` class' method definitions.
|
|
@todo: Improved `MasterElement.__eq__()` method, possibly doing a recursive
|
|
crawl of both elements and comparing the actual contents, or iterating
|
|
over chunks of the raw binary data. Current implementation doesn't check
|
|
element contents, just ID and payload size (for speed).
|
|
@todo: Document-wide caching, for future handling of streamed data. Affects
|
|
the longer-term streaming to-do (listed below) and optimization of
|
|
'infinite' elements (listed above).
|
|
@todo: Clean up and standardize usage of the term 'size' versus 'length.'
|
|
@todo: General documentation (more detailed than the README) and examples.
|
|
@todo: Document the best way to load schemata in a PyInstaller executable.
|
|
|
|
@todo: (longer term) Consider making schema loading automatic based on the EBML
|
|
DocType, DocTypeVersion, and DocTypeReadVersion. Would mean a refactoring
|
|
of how schemata are loaded.
|
|
@todo: (longer term) Refactor to support streaming data. This will require
|
|
modifying the indexing and iterating methods of `Document`. Also affects
|
|
the document-wide caching to-do item, listed above.
|
|
@todo: (longer term) Support the official Schema definition format. Start by
|
|
adopting some of the attributes, specifically ``minOccurs`` and
|
|
``maxOccurs`` (they serve the function provided by the current
|
|
``mandatory`` and ``multiple`` attributes). Add ``range`` later.
|
|
Eventually, recognize official schemata when loading, like the system
|
|
currently handles legacy ``python-ebml`` schemata.
|
|
"""
|
|
__author__ = "David Randall Stokes, Connor Flanigan"
|
|
__copyright__ = "Copyright 2022, Mide Technology Corporation"
|
|
__credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt"
|
|
|
|
__all__ = ['BinaryElement', 'DateElement', 'Document', 'Element',
|
|
'FloatElement', 'IntegerElement', 'MasterElement', 'Schema',
|
|
'StringElement', 'UIntegerElement', 'UnicodeElement',
|
|
'UnknownElement', 'VoidElement', 'loadSchema', 'parseSchema']
|
|
|
|
from ast import literal_eval
|
|
from datetime import datetime
|
|
import errno
|
|
import importlib
|
|
from io import BytesIO, StringIO, IOBase
|
|
import os.path
|
|
from pathlib import Path
|
|
import re
|
|
import sys
|
|
import types
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from .decoding import readElementID, readElementSize
|
|
from .decoding import readFloat, readInt, readUInt, readDate
|
|
from .decoding import readString, readUnicode
|
|
from . import encoding
|
|
from . import schemata
|
|
|
|
# Dictionaries in Python 3.7+ are explicitly insert-ordered in all
|
|
# implementations. If older, continue to use `collections.OrderedDict`.
|
|
if sys.hexversion < 0x03070000:
|
|
from collections import OrderedDict as Dict
|
|
else:
|
|
Dict = dict
|
|
|
|
# Additionally, `importlib.resources.files` is new to 3.9 as well; this is
|
|
# part of a work-around.
|
|
if sys.hexversion < 0x03090000:
|
|
importlib_resources = None
|
|
else:
|
|
import importlib.resources as importlib_resources
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
# SCHEMA_PATH: A list of paths for schema XML files, similar to `sys.path`.
|
|
# When `loadSchema()` is used, it will search these paths, in order, to find
|
|
# the schema file.
|
|
SCHEMA_PATH = ['',
|
|
os.path.realpath(os.path.dirname(schemata.__file__))]
|
|
|
|
SCHEMA_PATH.extend(p for p in os.environ.get('EBMLITE_SCHEMA_PATH', '').split(os.path.pathsep)
|
|
if p not in SCHEMA_PATH)
|
|
|
|
# SCHEMATA: A dictionary of loaded schemata, keyed by filename. Used by
|
|
# `loadSchema()`. In most cases, SCHEMATA should not be otherwise modified.
|
|
SCHEMATA = {}
|
|
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
class Element(object):
|
|
""" Base class for all EBML elements. Each data type has its own subclass,
|
|
and these subclasses get subclassed when a Schema is read.
|
|
|
|
@cvar id: The element's EBML ID.
|
|
@cvar name: The element's name.
|
|
@cvar schema: The `Schema` to which this element belongs.
|
|
@cvar multiple: Can this element be appear multiple times? Note:
|
|
Currently only enforced for encoding.
|
|
@cvar mandatory: Must this element appear in all EBML files using
|
|
this element's schema? Note: Not currently enforced.
|
|
@cvar children: A list of valid child element types. Only applicable to
|
|
`Document` and `Master` subclasses. Note: Not currently enforced.
|
|
@cvar dtype: The element's native Python data type.
|
|
@cvar precache: If `True`, the Element's value is read when the Element
|
|
is parsed. if `False`, the value is lazy-loaded when needed.
|
|
Numeric element types default to `True`. Can be used to reduce
|
|
the number of file seeks, potentially speeding things up.
|
|
@cvar length: An explicit length (in bytes) of the element when
|
|
encoding. `None` will use standard EBML variable-length encoding.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
|
|
# Parent `Schema`
|
|
schema = None
|
|
|
|
# Python native data type.
|
|
dtype = bytearray
|
|
|
|
# Should this element's value be read/cached when the element is parsed?
|
|
precache = False
|
|
|
|
# Do valid EBML documents require this element?
|
|
mandatory = False
|
|
|
|
# Does a valid EBML document permit more than one of the element?
|
|
multiple = False
|
|
|
|
# Explicit length for this Element subclass, used for encoding.
|
|
length = None
|
|
|
|
# For python-ebml compatibility; not currently used.
|
|
children = None
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
# Document-wide caching could be implemented here.
|
|
return bytearray(stream.read(size))
|
|
|
|
def __init__(self, stream=None, offset=0, size=0, payloadOffset=0):
|
|
""" Constructor. Instantiate a new Element from a file. In most cases,
|
|
elements should be created when a `Document` is loaded, rather
|
|
than instantiated explicitly.
|
|
|
|
@keyword stream: A file-like object containing EBML data.
|
|
@keyword offset: The element's starting location in the file.
|
|
@keyword size: The size of the whole element.
|
|
@keyword payloadOffset: The starting location of the element's
|
|
payload (i.e. immediately after the element's header).
|
|
"""
|
|
self.stream = stream
|
|
self.offset = offset
|
|
self.size = size
|
|
self.payloadOffset = payloadOffset
|
|
self._value = None
|
|
|
|
def __repr__(self):
|
|
return "<%s (ID:0x%02X), offset %s, size %s>" % \
|
|
(self.__class__.__name__, self.id, self.offset, self.size)
|
|
|
|
def __eq__(self, other):
|
|
""" Equality check. Elements are considered equal if they are the same
|
|
type and have the same ID, size, offset, and schema. Note: element
|
|
value is not considered! Check for value equality explicitly
|
|
(e.g. ``el1.value == el2.value``).
|
|
"""
|
|
if other is self:
|
|
return True
|
|
try:
|
|
return (self.dtype == other.dtype
|
|
and self.id == other.id
|
|
and self.offset == other.offset
|
|
and self.size == other.size
|
|
and self.schema == other.schema)
|
|
except AttributeError:
|
|
return False
|
|
|
|
@property
|
|
def value(self):
|
|
""" Parse and cache the element's value. """
|
|
if self._value is not None:
|
|
return self._value
|
|
self.stream.seek(self.payloadOffset)
|
|
self._value = self.parse(self.stream, self.size)
|
|
return self._value
|
|
|
|
def getRaw(self):
|
|
""" Get the element's raw binary data, including EBML headers.
|
|
"""
|
|
self.stream.seek(self.offset)
|
|
return self.stream.read(self.size + (self.payloadOffset - self.offset))
|
|
|
|
def getRawValue(self):
|
|
""" Get the raw binary of the element's value.
|
|
"""
|
|
self.stream.seek(self.payloadOffset)
|
|
return self.stream.read(self.size)
|
|
|
|
# ==========================================================================
|
|
# Caching (experimental)
|
|
# ==========================================================================
|
|
|
|
def gc(self, recurse=False):
|
|
""" Clear any cached values. To save memory and/or force values to be
|
|
re-read from the file. Returns the number of cached values cleared.
|
|
"""
|
|
if self._value is None:
|
|
return 0
|
|
|
|
self._value = None
|
|
return 1
|
|
|
|
# ==========================================================================
|
|
# Encoding
|
|
# ==========================================================================
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder. """
|
|
return encoding.encodeBinary(data, length)
|
|
|
|
@classmethod
|
|
def encode(cls, value, length=None, lengthSize=None, infinite=False):
|
|
""" Encode an EBML element.
|
|
|
|
@param value: The value to encode, or a list of values to encode.
|
|
If a list is provided, each item will be encoded as its own
|
|
element.
|
|
@keyword length: An explicit length for the encoded data,
|
|
overriding the variable length encoding. For producing
|
|
byte-aligned structures.
|
|
@keyword lengthSize: An explicit length for the encoded element
|
|
size, overriding the variable length encoding.
|
|
@return: A bytearray containing the encoded EBML data.
|
|
"""
|
|
if infinite and not issubclass(cls, MasterElement):
|
|
raise ValueError("Only Master elements can have 'infinite' lengths")
|
|
length = cls.length if length is None else length
|
|
if isinstance(value, (list, tuple)):
|
|
if not cls.multiple:
|
|
raise ValueError("Multiple %s elements per parent not permitted"
|
|
% cls.name)
|
|
result = bytearray()
|
|
for v in value:
|
|
result.extend(cls.encode(v, length, lengthSize, infinite))
|
|
return result
|
|
payload = cls.encodePayload(value, length=length)
|
|
length = None if infinite else (length or len(payload))
|
|
encId = encoding.encodeId(cls.id)
|
|
return encId + encoding.encodeSize(length, lengthSize) + payload
|
|
|
|
def dump(self):
|
|
""" Dump this element's value as nested dictionaries, keyed by
|
|
element name. For non-master elements, this just returns the
|
|
element's value; this method exists to maintain uniformity.
|
|
"""
|
|
return self.value
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class IntegerElement(Element):
|
|
""" Base class for an EBML signed integer element. Schema-specific
|
|
subclasses are generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = int
|
|
precache = True
|
|
|
|
def __eq__(self, other):
|
|
if not super(IntegerElement, self).__eq__(other):
|
|
return False
|
|
return self.value == other.value
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readInt(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for signed integer elements. """
|
|
return encoding.encodeInt(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class UIntegerElement(IntegerElement):
|
|
""" Base class for an EBML unsigned integer element. Schema-specific
|
|
subclasses are generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = int
|
|
precache = True
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readUInt(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for unsigned integer elements. """
|
|
return encoding.encodeUInt(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class FloatElement(Element):
|
|
""" Base class for an EBML floating point element. Schema-specific
|
|
subclasses are generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = float
|
|
precache = True
|
|
|
|
def __eq__(self, other):
|
|
if not super(FloatElement, self).__eq__(other):
|
|
return False
|
|
return self.value == other.value
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readFloat(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for floating point elements. """
|
|
return encoding.encodeFloat(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class StringElement(Element):
|
|
""" Base class for an EBML ASCII string element. Schema-specific
|
|
subclasses are generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = str
|
|
|
|
def __eq__(self, other):
|
|
if not super(StringElement, self).__eq__(other):
|
|
return False
|
|
return self.value == other.value
|
|
|
|
def __len__(self):
|
|
return self.size
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readString(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for ASCII string elements. """
|
|
return encoding.encodeString(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class UnicodeElement(StringElement):
|
|
""" Base class for an EBML UTF-8 string element. Schema-specific subclasses
|
|
are generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = str
|
|
|
|
def __len__(self):
|
|
# Value may be multiple bytes per character
|
|
return len(self.value)
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readUnicode(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for Unicode string elements. """
|
|
return encoding.encodeUnicode(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class DateElement(IntegerElement):
|
|
""" Base class for an EBML 'date' element. Schema-specific subclasses are
|
|
generated when a `Schema` is loaded.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
dtype = datetime
|
|
|
|
def parse(self, stream, size):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
It is assumed the file pointer is at the start of the payload.
|
|
"""
|
|
return readDate(stream, size)
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for date elements. """
|
|
return encoding.encodeDate(data, length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class BinaryElement(Element):
|
|
""" Base class for an EBML 'binary' element. Schema-specific subclasses
|
|
are generated when a `Schema` is loaded.
|
|
"""
|
|
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
|
|
def __len__(self):
|
|
return self.size
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class VoidElement(BinaryElement):
|
|
""" Special case ``Void`` element. Its contents are ignored and not read;
|
|
its `value` is always returned as ``0xFF`` times its length. To get
|
|
the actual contents, use `getRawValue()`.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
|
|
|
|
def parse(self, stream, size):
|
|
return bytearray()
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=0):
|
|
""" Type-specific payload encoder for Void elements. """
|
|
length = 0 if length is None else length
|
|
return bytearray(b'\xff' * length)
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class UnknownElement(BinaryElement):
|
|
""" Special case ``Unknown`` element, used for elements with IDs not
|
|
present in a schema. Unlike other elements, each instance has its own
|
|
ID.
|
|
"""
|
|
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value", "id",
|
|
"schema")
|
|
name = "UnknownElement"
|
|
precache = False
|
|
|
|
def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None,
|
|
schema=None):
|
|
""" Constructor. Instantiate a new `UnknownElement` from a file. In
|
|
most cases, elements should be created when a `Document` is loaded,
|
|
rather than instantiated explicitly.
|
|
|
|
@keyword stream: A file-like object containing EBML data.
|
|
@keyword offset: The element's starting location in the file.
|
|
@keyword size: The size of the whole element.
|
|
@keyword payloadOffset: The starting location of the element's
|
|
payload (i.e. immediately after the element's header).
|
|
@keyword id: The unknown element's ID. Unlike 'normal' elements,
|
|
in which ID is a class attribute, each UnknownElement instance
|
|
explicitly defines this.
|
|
@keyword schema: The schema used to load the element. Specified
|
|
explicitly because `UnknownElement`s are not part of any
|
|
schema.
|
|
"""
|
|
super(UnknownElement, self).__init__(stream, offset, size,
|
|
payloadOffset)
|
|
self.id = eid
|
|
self.schema = schema
|
|
|
|
def __eq__(self, other):
|
|
""" Equality check. Unknown elements are considered equal if they have
|
|
the same ID and value. Note that this differs from the criteria
|
|
used for other element classes!
|
|
"""
|
|
if other is self:
|
|
return True
|
|
try:
|
|
return (self.name == other.name
|
|
and self.id == other.id
|
|
and self.value == other.value)
|
|
except AttributeError:
|
|
return False
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
class MasterElement(Element):
|
|
""" Base class for an EBML 'master' element, a container for other
|
|
elements.
|
|
"""
|
|
__slots__ = ("stream", "offset", "sizeLength", "payloadOffset", "_value",
|
|
"_size", "_length")
|
|
dtype = list
|
|
|
|
def parse(self):
|
|
""" Type-specific helper function for parsing the element's payload.
|
|
"""
|
|
# Special case; unlike other elements, value() property doesn't call
|
|
# parse(). Used only when pre-caching.
|
|
return self.value
|
|
|
|
def parseElement(self, stream, nocache=False):
|
|
""" Read the next element from a stream, instantiate a `MasterElement`
|
|
object, and then return it and the offset of the next element
|
|
(this element's position + size).
|
|
|
|
@param stream: The source file-like stream.
|
|
@keyword nocache: If `True`, the parsed element's `precache`
|
|
attribute is ignored, and the element's value will not be
|
|
cached. For faster iteration when the element value doesn't
|
|
matter (e.g. counting child elements).
|
|
@return: The parsed element and the offset of the next element
|
|
(i.e. the end of the parsed element).
|
|
"""
|
|
offset = stream.tell()
|
|
eid, idlen = readElementID(stream)
|
|
esize, sizelen = readElementSize(stream)
|
|
payloadOffset = offset + idlen + sizelen
|
|
|
|
try:
|
|
etype = self.schema.elements[eid]
|
|
el = etype(stream, offset, esize, payloadOffset)
|
|
except KeyError:
|
|
el = self.schema.UNKNOWN(stream, offset, esize, payloadOffset,
|
|
eid=eid, schema=self.schema)
|
|
|
|
if el.precache and not nocache:
|
|
# Read the value now, avoiding a seek later.
|
|
el._value = el.parse(stream, el.size)
|
|
|
|
return el, payloadOffset + el.size
|
|
|
|
@classmethod
|
|
def _isValidChild(cls, elId):
|
|
""" Is the given element ID represent a valid sub-element, i.e.
|
|
explicitly specified as a child element or a 'global' in the
|
|
schema?
|
|
"""
|
|
if not cls.children:
|
|
return False
|
|
|
|
try:
|
|
return elId in cls._childIds
|
|
except AttributeError:
|
|
# The set of valid child IDs hasn't been created yet.
|
|
cls._childIds = set(cls.children)
|
|
if cls.schema is not None:
|
|
cls._childIds.update(cls.schema.globals)
|
|
return elId in cls._childIds
|
|
|
|
@property
|
|
def size(self):
|
|
""" The element's size. Master elements can be instantiated with this
|
|
as `None`; this denotes an 'infinite' EBML element, and its size
|
|
will be determined by iterating over its contents until an invalid
|
|
child type is found, or the end-of-file is reached.
|
|
"""
|
|
try:
|
|
return self._size
|
|
except AttributeError:
|
|
# An "infinite" element (size specified in file is all 0xFF)
|
|
pos = end = self.payloadOffset
|
|
numChildren = 0
|
|
while True:
|
|
self.stream.seek(pos)
|
|
end = pos
|
|
try:
|
|
# TODO: Cache parsed elements?
|
|
el, pos = self.parseElement(self.stream, nocache=True)
|
|
if self._isValidChild(el.id):
|
|
numChildren += 1
|
|
else:
|
|
break
|
|
except TypeError as err:
|
|
# Will occur at end of file; message will contain "ord()".
|
|
if "ord()" in str(err):
|
|
break
|
|
# Not the expected EOF TypeError!
|
|
raise
|
|
|
|
self._size = end - self.payloadOffset
|
|
self._length = numChildren
|
|
return self._size
|
|
|
|
@size.setter
|
|
def size(self, esize):
|
|
if esize is not None:
|
|
# Only create the `_size` attribute for a real value. Don't
|
|
# define it if it's `None`, so `size` will get calculated.
|
|
self._size = esize
|
|
|
|
def __iter__(self, nocache=False):
|
|
""" x.__iter__() <==> iter(x)
|
|
"""
|
|
# TODO: Better support for 'infinite' elements (getting the size of
|
|
# an infinite element iterates over it, so there's duplicated effort.)
|
|
pos = self.payloadOffset
|
|
payloadEnd = pos + self.size
|
|
|
|
while pos < payloadEnd:
|
|
self.stream.seek(pos)
|
|
try:
|
|
el, pos = self.parseElement(self.stream, nocache=nocache)
|
|
yield el
|
|
except TypeError as err:
|
|
if "ord()" in str(err):
|
|
break
|
|
raise
|
|
|
|
def __len__(self):
|
|
""" x.__len__() <==> len(x)
|
|
"""
|
|
try:
|
|
return self._length
|
|
except AttributeError:
|
|
if self._value is not None:
|
|
self._length = len(self._value)
|
|
else:
|
|
n = 0 # In case there's nothing to enumerate
|
|
for n, _el in enumerate(self.__iter__(nocache=True), 1):
|
|
pass
|
|
self._length = n
|
|
return self._length
|
|
|
|
@property
|
|
def value(self):
|
|
""" Parse and cache the element's value.
|
|
"""
|
|
if self._value is not None:
|
|
return self._value
|
|
self._value = list(self)
|
|
return self._value
|
|
|
|
def __getitem__(self, *args):
|
|
# TODO: Parse only the requested item(s), like `Document`
|
|
return self.value.__getitem__(*args)
|
|
|
|
# ==========================================================================
|
|
# Caching (experimental!)
|
|
# ==========================================================================
|
|
|
|
def gc(self, recurse=False):
|
|
""" Clear any cached values. To save memory and/or force values to be
|
|
re-read from the file.
|
|
"""
|
|
cleared = 0
|
|
if self._value is not None:
|
|
if recurse:
|
|
cleared = sum(ch.gc(recurse) for ch in self._value) + 1
|
|
self._value = None
|
|
return cleared
|
|
|
|
# ==========================================================================
|
|
# Encoding
|
|
# ==========================================================================
|
|
|
|
@classmethod
|
|
def encodePayload(cls, data, length=None):
|
|
""" Type-specific payload encoder for 'master' elements.
|
|
"""
|
|
result = bytearray()
|
|
if data is None:
|
|
return result
|
|
elif isinstance(data, dict):
|
|
data = data.items()
|
|
elif not isinstance(data, (list, tuple)):
|
|
raise TypeError("wrong type for %s payload: %s" % (cls.name,
|
|
type(data)))
|
|
for k, v in data:
|
|
if k not in cls.schema:
|
|
raise TypeError("Element type %r not found in schema" % k)
|
|
# TODO: Validation of hierarchy, multiplicity, mandate, etc.
|
|
result.extend(cls.schema[k].encode(v))
|
|
|
|
return result
|
|
|
|
@classmethod
|
|
def encode(cls, data, length=None, lengthSize=None, infinite=False):
|
|
""" Encode an EBML master element.
|
|
|
|
@param data: The data to encode, provided as a dictionary keyed by
|
|
element name, a list of two-item name/value tuples, or a list
|
|
of either. Note: individual items in a list of name/value
|
|
pairs *must* be tuples!
|
|
@keyword infinite: If `True`, the element will be written with an
|
|
undefined size. When parsed, its end will be determined by the
|
|
occurrence of an invalid child element (or end-of-file).
|
|
@return: A bytearray containing the encoded EBML binary.
|
|
"""
|
|
# TODO: Use 'length' to automatically generate `Void` element?
|
|
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
|
|
# List of lists: special case for 'master' elements.
|
|
# Encode as multiple 'master' elements.
|
|
result = bytearray()
|
|
for v in data:
|
|
result.extend(cls.encode(v, length=length,
|
|
lengthSize=lengthSize,
|
|
infinite=infinite))
|
|
return result
|
|
|
|
# TODO: Remove 'infinite' kwarg from `Element.encode()` and handle it
|
|
# here, since it only applied to Master elements.
|
|
return super(MasterElement, cls).encode(data, length=length,
|
|
lengthSize=lengthSize,
|
|
infinite=infinite)
|
|
|
|
def dump(self):
|
|
""" Dump this element's value as nested dictionaries, keyed by
|
|
element name. The values of 'multiple' elements return as lists.
|
|
Note: The order of 'multiple' elements relative to other elements
|
|
will be lost; a file containing elements ``A1 B1 A2 B2 A3 B3`` will
|
|
result in``[A1 A2 A3][B1 B2 B3]``.
|
|
|
|
@todo: Decide if this should be in the `util` submodule. It is
|
|
very specific, and it isn't totally necessary for the core
|
|
library.
|
|
"""
|
|
result = Dict()
|
|
for el in self:
|
|
if el.multiple:
|
|
result.setdefault(el.name, []).append(el.dump())
|
|
else:
|
|
result[el.name] = el.dump()
|
|
return result
|
|
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
|
|
class Document(MasterElement):
|
|
""" Base class for an EBML document, containing multiple 'root' elements.
|
|
Loading a `Schema` generates a subclass.
|
|
"""
|
|
|
|
def __init__(self, stream, name=None, size=None, headers=True):
|
|
""" Constructor. Instantiate a `Document` from a file-like stream.
|
|
In most cases, `Schema.load()` should be used instead of
|
|
explicitly instantiating a `Document`.
|
|
|
|
@param stream: A stream object (e.g. a file) from which to read
|
|
the EBML content.
|
|
@keyword name: The name of the document. Defaults to the filename
|
|
(if applicable).
|
|
@keyword size: The size of the document, in bytes. Use if the
|
|
stream is neither a file or a `BytesIO` object.
|
|
@keyword headers: If `False`, the file's ``EBML`` header element
|
|
(if present) will not appear as a root element in the document.
|
|
The contents of the ``EBML`` element will always be read,
|
|
regardless, and stored in the Document's `info` attribute.
|
|
"""
|
|
self._ownsStream = False
|
|
if isinstance(stream, (str, bytes, bytearray)):
|
|
stream = open(stream, 'rb')
|
|
self._ownsStream = True
|
|
|
|
if not all((hasattr(stream, 'read'),
|
|
hasattr(stream, 'tell'),
|
|
hasattr(stream, 'seek'))):
|
|
raise TypeError('Object %r does not have the necessary stream methods' % stream)
|
|
|
|
self._value = None
|
|
self.stream = stream
|
|
self.size = size
|
|
self.name = name
|
|
self.id = None # Not applicable to Documents.
|
|
self.offset = self.payloadOffset = self.stream.tell()
|
|
|
|
try:
|
|
self.filename = stream.name
|
|
except AttributeError:
|
|
self.filename = ""
|
|
|
|
if name is None:
|
|
if self.filename:
|
|
self.name = os.path.splitext(os.path.basename(self.filename))[0]
|
|
else:
|
|
self.name = self.__class__.__name__
|
|
|
|
if size is None:
|
|
# Note: this doesn't work for cStringIO!
|
|
if isinstance(stream, BytesIO):
|
|
self.size = len(stream.getvalue())
|
|
elif self.filename and os.path.exists(self.filename):
|
|
self.size = os.path.getsize(self.stream.name)
|
|
|
|
self.info = {}
|
|
|
|
try:
|
|
# Attempt to read the first element, which should be an EBML header.
|
|
el, pos = self.parseElement(self.stream)
|
|
if el.name == "EBML":
|
|
# Load 'header' info from the file
|
|
self.info = el.dump()
|
|
if not headers:
|
|
self.payloadOffset = pos
|
|
except:
|
|
# Failed to read the first element. Don't raise here; do that when
|
|
# the Document is actually used.
|
|
pass
|
|
|
|
def __repr__(self):
|
|
""" "x.__repr__() <==> repr(x) """
|
|
if self.name == self.__class__.__name__:
|
|
return object.__repr__(self)
|
|
return "<%s %r at 0x%08X>" % (self.__class__.__name__, self.name,
|
|
id(self))
|
|
|
|
def __enter__(self):
|
|
""" Enter context manager for this document.
|
|
"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
""" Close this document on exiting context manager.
|
|
"""
|
|
self.close()
|
|
|
|
def close(self):
|
|
""" Closes the EBML file. If the `Document` was created using a
|
|
file/stream (as opposed to a filename), the source file/stream is
|
|
not closed.
|
|
"""
|
|
if self._ownsStream:
|
|
self.stream.close()
|
|
|
|
def __len__(self):
|
|
""" x.__len__() <==> len(x)
|
|
Not recommended for huge documents.
|
|
"""
|
|
try:
|
|
return self._length
|
|
except AttributeError:
|
|
n = 0 # in case there's nothing to enumerate
|
|
for n, _el in enumerate(self.__iter__(nocache=True), 1):
|
|
pass
|
|
self._length = n
|
|
return self._length
|
|
|
|
def __iter__(self, nocache=False):
|
|
""" Iterate root elements.
|
|
"""
|
|
# TODO: Cache root elements, prevent unnecessary duplicates. Maybe a
|
|
# dict keyed by offset?
|
|
pos = self.payloadOffset
|
|
while True:
|
|
self.stream.seek(pos)
|
|
try:
|
|
el, pos = self.parseElement(self.stream, nocache=nocache)
|
|
yield el
|
|
except TypeError as err:
|
|
# Occurs at end of file (parsing 0 length string), it's okay.
|
|
if "ord()" not in str(err):
|
|
# (Apparently) not the TypeError raised at EOF!
|
|
raise
|
|
break
|
|
|
|
@property
|
|
def value(self):
|
|
""" An iterator for iterating the document's root elements. Same as
|
|
`Document.__iter__()`.
|
|
"""
|
|
# 'value' not really applicable to a document; return an iterator.
|
|
return iter(self)
|
|
|
|
def __getitem__(self, idx):
|
|
""" Get one of the document's root elements by index.
|
|
"""
|
|
# TODO: Cache parsed root elements, handle indexing dynamically.
|
|
if isinstance(idx, int):
|
|
if idx < 0:
|
|
raise IndexError("Negative indices in a Document not (yet) supported")
|
|
n = None
|
|
for n, el in enumerate(self):
|
|
if n == idx:
|
|
return el
|
|
if n is None:
|
|
# If object being enumerated is empty, `n` is never set.
|
|
raise IndexError("Document contained no readable data")
|
|
raise IndexError("list index out of range (0-%d)" % n)
|
|
elif isinstance(idx, slice):
|
|
raise IndexError("Document root slicing not (yet) supported")
|
|
else:
|
|
raise TypeError("list indices must be integers, not %s" % type(idx))
|
|
|
|
@property
|
|
def version(self):
|
|
""" The document's type version (i.e. the EBML ``DocTypeVersion``). """
|
|
return self.info.get('DocTypeVersion')
|
|
|
|
@property
|
|
def type(self):
|
|
""" The document's type name (i.e. the EBML ``DocType``). """
|
|
return self.info.get('DocType')
|
|
|
|
# ==========================================================================
|
|
# Caching (experimental!)
|
|
# ==========================================================================
|
|
|
|
def gc(self, recurse=False):
|
|
# TODO: Implement this if/when caching of root elements is implemented.
|
|
return 0
|
|
|
|
# ==========================================================================
|
|
# Encoding
|
|
# ==========================================================================
|
|
|
|
@classmethod
|
|
def _createHeaders(cls):
|
|
""" Create the default EBML 'header' elements for a Document, using
|
|
the default values in the schema.
|
|
|
|
@return: A dictionary containing a single key (``EBML``) with a
|
|
dictionary as its value. The child dictionary contains
|
|
element names and values.
|
|
"""
|
|
if 'EBML' not in cls.schema:
|
|
return {}
|
|
|
|
headers = Dict()
|
|
for elName, elType in (('EBMLVersion', int),
|
|
('EBMLReadVersion', int),
|
|
('DocType', str),
|
|
('DocTypeVersion', int),
|
|
('DocTypeReadVersion', int)):
|
|
if elName in cls.schema:
|
|
v = cls.schema._getInfo(cls.schema[elName].id, elType)
|
|
if v is not None:
|
|
headers[elName] = v
|
|
|
|
return Dict(EBML=headers)
|
|
|
|
@classmethod
|
|
def encode(cls, stream, data, headers=False, **kwargs):
|
|
""" Encode an EBML document.
|
|
|
|
@param value: The data to encode, provided as a dictionary keyed
|
|
by element name, or a list of two-item name/value tuples.
|
|
Note: individual items in a list of name/value pairs *must*
|
|
be tuples!
|
|
@return: A bytearray containing the encoded EBML binary.
|
|
"""
|
|
if headers is True:
|
|
stream.write(cls.encodePayload(cls._createHeaders()))
|
|
|
|
if isinstance(data, list):
|
|
if len(data) > 0 and isinstance(data[0], list):
|
|
# List of lists: special case for Documents.
|
|
# Encode as multiple 'root' elements.
|
|
raise TypeError('Cannot encode multiple Documents')
|
|
else:
|
|
for v in data:
|
|
stream.write(cls.encodePayload(v))
|
|
else:
|
|
stream.write(cls.encodePayload(data))
|
|
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
|
|
class Schema(object):
|
|
""" An EBML schema, mapping element IDs to names and data types. Unlike
|
|
the document and element types, this is not a base class; all schemata
|
|
are actual instances of this class.
|
|
|
|
@ivar document: The schema's Document subclass.
|
|
@ivar elements: A dictionary mapping element IDs to the schema's
|
|
corresponding `Element` subclasses.
|
|
@ivar elementsByName: A dictionary mapping element names to the
|
|
schema's corresponding `Element` subclasses.
|
|
@ivar elementInfo: A dictionary mapping IDs to the raw schema
|
|
attribute data. It may have additional items not present in the
|
|
created element class' attributes.
|
|
|
|
@ivar UNKNOWN: A class/function that handles unknown element IDs. By
|
|
default, this is the `UnknownElement` class. Special-case handling
|
|
can be done by substituting a different class, or an
|
|
element-producing factory function.
|
|
|
|
@ivar source: The source from which the Schema was loaded; either a
|
|
filename or a file-like stream.
|
|
@ivar filename: The absolute path of the source file, if the source
|
|
was a file or a filename.
|
|
"""
|
|
|
|
BASE_CLASSES = {
|
|
'BinaryElement': BinaryElement,
|
|
'DateElement': DateElement,
|
|
'FloatElement': FloatElement,
|
|
'IntegerElement': IntegerElement,
|
|
'MasterElement': MasterElement,
|
|
'StringElement': StringElement,
|
|
'UIntegerElement': UIntegerElement,
|
|
'UnicodeElement': UnicodeElement,
|
|
}
|
|
|
|
# Mapping of schema type names to the corresponding Element subclasses.
|
|
# For python-ebml schema compatibility.
|
|
ELEMENT_TYPES = {
|
|
'integer': IntegerElement,
|
|
'uinteger': UIntegerElement,
|
|
'float': FloatElement,
|
|
'string': StringElement,
|
|
'utf-8': UnicodeElement,
|
|
'date': DateElement,
|
|
'binary': BinaryElement,
|
|
'master': MasterElement,
|
|
}
|
|
|
|
# The handler for unknown element IDs. By default, this is just the
|
|
# `UnknownElement` class. Special-case handling of unknown elements can
|
|
# be done by substituting a different class, or an element-producing
|
|
# factory function.
|
|
UNKNOWN = UnknownElement
|
|
|
|
def __init__(self, source, name=None):
|
|
""" Constructor. Creates a new Schema from a schema description XML.
|
|
|
|
@param source: The Schema's source, either a string with the full
|
|
path and name of the schema XML file, or a file-like stream.
|
|
@keyword name: The schema's name. Defaults to the document type
|
|
element's default value (if defined) or the base file name.
|
|
"""
|
|
self.source = source
|
|
self.filename = None
|
|
|
|
if isinstance(source, (str, bytes, bytearray)):
|
|
self.filename = os.path.realpath(source)
|
|
elif hasattr(source, "name"):
|
|
self.filename = os.path.realpath(source.name)
|
|
|
|
self.elements = {} # Element types, keyed by ID
|
|
self.elementsByName = {} # Element types, keyed by element name
|
|
self.elementInfo = {} # Raw element schema attributes, keyed by ID
|
|
|
|
self.globals = {} # Elements valid for any parent, by ID
|
|
self.children = {} # Valid root elements, by ID
|
|
|
|
# Parse, using the correct method for the schema format.
|
|
schema = ET.parse(source)
|
|
root = schema.getroot()
|
|
if root.tag == "table":
|
|
# Old python-ebml schema: root element is <table>
|
|
self._parseLegacySchema(root)
|
|
elif root.tag == "Schema":
|
|
# new ebmlite schema: root element is <Schema>
|
|
self._parseSchema(root, self)
|
|
else:
|
|
raise IOError("Could not parse schema; expected root element "
|
|
"<Schema> or <table>, got <%s>" % root.tag)
|
|
|
|
# Special case: `Void` is a standard EBML element, but not its own
|
|
# type (it's technically binary). Use the special `VoidElement` type.
|
|
if 'Void' in self.elementsByName:
|
|
el = self.elementsByName['Void']
|
|
void = type('VoidElement', (VoidElement,),
|
|
{'id': el.id, 'name': 'Void', 'schema': self,
|
|
'mandatory': el.mandatory, 'multiple': el.multiple})
|
|
self.elements[el.id] = void
|
|
self.elementsByName['Void'] = void
|
|
|
|
# Schema name. Defaults to the schema's default EBML 'DocType'
|
|
self.name = name or self.type
|
|
|
|
# Create the schema's Document subclass.
|
|
self.document = type('%sDocument' % self.name.title(), (Document,),
|
|
{'schema': self, 'children': self.children})
|
|
|
|
def _parseLegacySchema(self, schema):
|
|
""" Parse a legacy python-ebml schema XML file.
|
|
"""
|
|
for el in schema.findall('element'):
|
|
attribs = el.attrib.copy()
|
|
|
|
eid = int(attribs['id'], 16) if 'id' in attribs else None
|
|
ename = attribs['name'].strip() if 'name' in attribs else None
|
|
etype = attribs['type'].strip() if 'type' in attribs else None
|
|
|
|
# Use text in the element as its docstring. Note: embedded HTML
|
|
# tags (as in the Matroska schema) will cause the text to be
|
|
# truncated.
|
|
docs = el.text.strip() if isinstance(el.text, (str, bytes, bytearray)) else None
|
|
|
|
if etype is None:
|
|
raise ValueError('Element "%s" (ID 0x%02X) missing required '
|
|
'"type" attribute' % (ename, eid))
|
|
|
|
if etype not in self.ELEMENT_TYPES:
|
|
raise ValueError("Unknown type for element %r (ID 0x%02x): %r" %
|
|
(ename, eid, etype))
|
|
|
|
self.addElement(eid, ename, self.ELEMENT_TYPES[etype], attribs,
|
|
docs=docs)
|
|
|
|
def _parseSchema(self, el, parent=None):
|
|
""" Recursively crawl a schema XML definition file.
|
|
"""
|
|
if el.tag == "Schema":
|
|
for chEl in el:
|
|
self._parseSchema(chEl, self)
|
|
return
|
|
|
|
if el.tag not in self.BASE_CLASSES:
|
|
if el.tag.endswith('Element'):
|
|
raise ValueError('Unknown element type: %s' % el.tag)
|
|
|
|
# FUTURE: Add schema-describing metadata (author, origin,
|
|
# description, etc.) to XML as non-Element elements. Parse them
|
|
# out here.
|
|
return
|
|
|
|
attribs = el.attrib.copy()
|
|
eid = int(attribs['id'], 16) if 'id' in attribs else None
|
|
ename = attribs['name'].strip() if 'name' in attribs else None
|
|
|
|
# Use text in the element as its docstring. Note: embedded HTML tags
|
|
# (as in the Matroska schema) will cause the text to be truncated.
|
|
docs = el.text.strip() if isinstance(el.text, (str, bytes, bytearray)) else None
|
|
|
|
baseClass = self.BASE_CLASSES[el.tag]
|
|
|
|
cls = self.addElement(eid, ename, baseClass, attribs, parent, docs)
|
|
|
|
if baseClass is MasterElement:
|
|
for chEl in el:
|
|
self._parseSchema(chEl, cls)
|
|
|
|
def addElement(self, eid, ename, baseClass, attribs={}, parent=None,
|
|
docs=None):
|
|
""" Create a new `Element` subclass and add it to the schema.
|
|
|
|
Duplicate elements are permitted (e.g. if one kind of element can
|
|
appear in different master elements), provided their attributes do
|
|
not conflict. The first appearance of an element definition in the
|
|
schema must contain the required ID, name, and type; successive
|
|
appearances only need the ID and/or name.
|
|
|
|
@param eid: The element's EBML ID.
|
|
@param ename: The element's name.
|
|
@keyword multiple: If `True`, an EBML document can contain more
|
|
than one of this element. Not currently enforced.
|
|
@keyword mandatory: If `True`, a valid EBML document requires one
|
|
(or more) of this element. Not currently enforced.
|
|
@keyword length: A fixed length to use when writing the element.
|
|
`None` will use the minimum length required.
|
|
@keyword precache: If `True`, the element's value will be read
|
|
when the element is parsed, rather than when the value is
|
|
explicitly accessed. Can save time for small elements.
|
|
@keyword attribs: A dictionary of raw element attributes, as read
|
|
from the schema file.
|
|
@keyword parent: The new element's parent element class.
|
|
@keyword docs: The new element's docstring (e.g. the defining XML
|
|
element's text content).
|
|
"""
|
|
|
|
def _getBool(d, k, default):
|
|
""" Helper function to get a dictionary value cast to bool. """
|
|
try:
|
|
return str(d[k]).strip()[0] in 'Tt1'
|
|
except (KeyError, TypeError, IndexError, ValueError):
|
|
# TODO: Don't fail silently for some exceptions.
|
|
pass
|
|
return default
|
|
|
|
def _getInt(d, k, default):
|
|
""" Helper function to get a dictionary value cast to int. """
|
|
try:
|
|
return int(literal_eval(d[k].strip()))
|
|
except (KeyError, SyntaxError, TypeError, ValueError):
|
|
# TODO: Don't fail silently for some exceptions.
|
|
pass
|
|
return default
|
|
|
|
if eid in self.elements or ename in self.elementsByName:
|
|
# Already appeared in schema. Duplicates are permitted for
|
|
# defining an element that can appear as a child to multiple
|
|
# Master elements, so long as they have the same attributes.
|
|
# Additional definitions only need to specify the element ID
|
|
# and/or element name.
|
|
oldEl = self[ename or eid]
|
|
ename = oldEl.name
|
|
eid = oldEl.id
|
|
|
|
if not issubclass(self.elements[eid], baseClass):
|
|
raise TypeError('%s %r (ID 0x%02X) redefined as %s' %
|
|
(oldEl.__name__, ename, eid, baseClass.__name__))
|
|
|
|
newatts = self.elementInfo[eid].copy()
|
|
newatts.update(attribs)
|
|
if self.elementInfo[eid] == newatts:
|
|
eclass = self.elements[eid]
|
|
else:
|
|
raise TypeError('Element %r (ID 0x%02X) redefined with '
|
|
'different attributes' % (ename, eid))
|
|
else:
|
|
# New element class. It requires both a name and an ID.
|
|
# Validate both the name and the ID.
|
|
if eid is None:
|
|
raise ValueError('Element definition missing required '
|
|
'"id" attribute')
|
|
elif not isinstance(eid, int):
|
|
raise TypeError("Invalid type for element ID: " +
|
|
"{} ({})".format(eid, type(eid).__name__))
|
|
|
|
if ename is None:
|
|
raise ValueError('Element definition missing required '
|
|
'"name" attribute')
|
|
elif not isinstance(ename, (str, bytes, bytearray)):
|
|
raise TypeError('Invalid type for element name: ' +
|
|
'{} ({})'.format(ename, type(ename).__name__))
|
|
elif not (ename[0].isalpha() or ename[0] == "_"):
|
|
raise ValueError("Invalid element name: %r" % ename)
|
|
|
|
mandatory = _getBool(attribs, 'mandatory', False)
|
|
multiple = _getBool(attribs, 'multiple', False)
|
|
precache = _getBool(attribs, 'precache', baseClass.precache)
|
|
length = _getInt(attribs, 'length', None)
|
|
isGlobal = _getInt(attribs, 'global', None)
|
|
|
|
if isGlobal is None:
|
|
# Element 'level'. The old schema format used level to define
|
|
# the structure (the file itself was flat); the new format's
|
|
# schema structure defined the EBML structure. The exception
|
|
# are 'global' elements, which may appear anywhere. The old
|
|
# format defined these as having a level of -1. The new format
|
|
# uses a Boolean attribute, `global`, but fall back to
|
|
# reading `level` if `global` isn't defined.
|
|
isGlobal = _getInt(attribs, 'level', None) == -1
|
|
|
|
# Create a new Element subclass
|
|
eclass = type('%sElement' % ename, (baseClass,),
|
|
{'id': eid, 'name': ename, 'schema': self,
|
|
'mandatory': mandatory, 'multiple': multiple,
|
|
'precache': precache, 'length': length,
|
|
'children': dict(), '__doc__': docs,
|
|
'__slots__': baseClass.__slots__})
|
|
|
|
self.elements[eid] = eclass
|
|
self.elementInfo[eid] = attribs
|
|
self.elementsByName[ename] = eclass
|
|
|
|
if isGlobal:
|
|
self.globals[eid] = eclass
|
|
|
|
parent = parent or self
|
|
if parent.children is None:
|
|
parent.children = {}
|
|
parent.children[eid] = eclass
|
|
|
|
return eclass
|
|
|
|
def __repr__(self):
|
|
try:
|
|
if isinstance(self.source, (BytesIO, StringIO)):
|
|
source = "string"
|
|
else:
|
|
source = "'%s'" % (self.filename or self.source)
|
|
return "<%s %r from %s>" % (self.__class__.__name__, self.name,
|
|
source)
|
|
except AttributeError:
|
|
return object.__repr__(self)
|
|
|
|
def __eq__(self, other):
|
|
""" Equality check. Schemata are considered equal if the attributes of
|
|
their elements match.
|
|
"""
|
|
try:
|
|
return self is other or self.elementInfo == other.elementInfo
|
|
except AttributeError:
|
|
return False
|
|
|
|
def __contains__(self, key):
|
|
""" Does the Schema contain a given element name or ID? """
|
|
return (key in self.elementsByName) or (key in self.elements)
|
|
|
|
def __getitem__(self, key):
|
|
""" Get an Element class from the schema, by name or by ID. """
|
|
try:
|
|
return self.elements[key]
|
|
except KeyError:
|
|
return self.elementsByName[key]
|
|
|
|
def get(self, key, default=None):
|
|
if key in self:
|
|
return self[key]
|
|
return default
|
|
|
|
def load(self, fp, name=None, headers=False, **kwargs):
|
|
""" Load an EBML file using this Schema.
|
|
|
|
@param fp: A file-like object containing the EBML to load, or the
|
|
name of an EBML file.
|
|
@keyword name: The name of the document. Defaults to filename.
|
|
@keyword headers: If `False`, the file's ``EBML`` header element
|
|
(if present) will not appear as a root element in the
|
|
document. The contents of the ``EBML`` element will always be
|
|
read.
|
|
"""
|
|
return self.document(fp, name=name, headers=headers, **kwargs)
|
|
|
|
def loads(self, data, name=None):
|
|
""" Load EBML from a string using this Schema.
|
|
|
|
@param data: A string or bytearray containing raw EBML data.
|
|
@keyword name: The name of the document. Defaults to the Schema's
|
|
document class name.
|
|
"""
|
|
# Below updated to add EBML headers to first fragement
|
|
#return self.load(BytesIO(data), name=name)
|
|
return self.load(BytesIO(data), name=name, headers=True)
|
|
|
|
def __call__(self, fp, name=None):
|
|
""" Load an EBML file using this Schema. Same as `Schema.load()`.
|
|
|
|
@todo: Decide if this is worth keeping. It exists for historical
|
|
reasons that may have been refactored out.
|
|
|
|
@param fp: A file-like object containing the EBML to load, or the
|
|
name of an EBML file.
|
|
@keyword name: The name of the document. Defaults to filename.
|
|
"""
|
|
return self.load(fp, name=name)
|
|
|
|
# ==========================================================================
|
|
# Schema info stuff. Uses python-ebml schema XML data. Refactor later.
|
|
# ==========================================================================
|
|
|
|
def _getInfo(self, eid, dtype):
|
|
""" Helper method to get the 'default' value of an element. """
|
|
try:
|
|
return dtype(self.elementInfo[eid]['default'])
|
|
except (KeyError, ValueError):
|
|
return None
|
|
|
|
@property
|
|
def version(self):
|
|
""" Schema version, extracted from EBML ``DocTypeVersion`` default. """
|
|
return self._getInfo(0x4287, int) # ID of EBML 'DocTypeVersion'
|
|
|
|
@property
|
|
def type(self):
|
|
""" Schema type name, extracted from EBML ``DocType`` default. """
|
|
return self._getInfo(0x4282, str) # ID of EBML 'DocType'
|
|
|
|
# ==========================================================================
|
|
# Encoding
|
|
# ==========================================================================
|
|
|
|
def encode(self, stream, data, headers=False):
|
|
""" Write an EBML document using this Schema to a file or file-like
|
|
stream.
|
|
|
|
@param stream: The file (or ``.write()``-supporting file-like
|
|
object) to which to write the encoded EBML.
|
|
@param data: The data to encode, provided as a dictionary keyed by
|
|
element name, or a list of two-item name/value tuples. Note:
|
|
individual items in a list of name/value pairs *must* be tuples!
|
|
"""
|
|
self.document.encode(stream, data, headers=headers)
|
|
return stream
|
|
|
|
def encodes(self, data, headers=False):
|
|
""" Create an EBML document using this Schema, returned as a string.
|
|
|
|
@param data: The data to encode, provided as a dictionary keyed by
|
|
element name, or a list of two-item name/value tuples. Note:
|
|
individual items in a list of name/value pairs *must* be tuples!
|
|
@return: A string containing the encoded EBML binary.
|
|
"""
|
|
stream = BytesIO()
|
|
self.encode(stream, data, headers=headers)
|
|
return stream.getvalue()
|
|
|
|
def verify(self, data):
|
|
""" Perform basic tests on EBML binary data, ensuring it can be parsed
|
|
using this `Schema`. Failure will raise an expression.
|
|
"""
|
|
|
|
def _crawl(el):
|
|
if isinstance(el, MasterElement):
|
|
for subel in el:
|
|
_crawl(subel)
|
|
elif isinstance(el, UnknownElement):
|
|
raise NameError("Verification failed, unknown element ID %x" %
|
|
el.id)
|
|
else:
|
|
_ = el.value
|
|
|
|
return True
|
|
|
|
return _crawl(self.loads(data))
|
|
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
def _expandSchemaPath(path, name=''):
|
|
""" Helper function to process a schema path or name, converting module
|
|
references to Paths.
|
|
|
|
@param path: The schema path. May be a directory name, a module
|
|
name in braces (e.g., `{idelib.schemata}`), or a module
|
|
instance. Directory and module names may contain schema
|
|
filenames.
|
|
@param name: An optional schema base filename. Will get appended
|
|
to the resulting `Path`/`Traversable`.
|
|
@return: A `Path`/`Traversable` object.
|
|
"""
|
|
strpath = str(path)
|
|
subdir = ''
|
|
|
|
if not strpath:
|
|
path = strpath = os.getcwd()
|
|
elif '{' in strpath:
|
|
if '}' not in strpath:
|
|
raise IOError(errno.ENOENT, 'Malformed module path', strpath)
|
|
|
|
m = re.match(r'(\{.+\})[/\\](.+)', strpath)
|
|
if m:
|
|
path, subdir = m.groups()
|
|
strpath = path
|
|
|
|
if importlib_resources:
|
|
if isinstance(path, types.ModuleType):
|
|
return importlib_resources.files(path) / subdir / name
|
|
elif '{' in strpath:
|
|
return importlib_resources.files(strpath.strip('{} ')) / subdir / name
|
|
else:
|
|
# Pre-3.9: Use naive means of finding the module path. Won't work in
|
|
# some cases (module is a zip, etc.); it's just a fallback. To be
|
|
# deprecated.
|
|
if isinstance(path, types.ModuleType):
|
|
path = os.path.dirname(path.__file__)
|
|
elif '{' in strpath:
|
|
path = os.path.dirname(importlib.import_module(strpath.strip('{}')).__file__)
|
|
|
|
return Path(path) / subdir / name
|
|
|
|
|
|
def listSchemata(*paths, absolute=True):
|
|
""" Gather all EBML schemata. `ebmlite.SCHEMA_PATH` is used by default;
|
|
alternatively, one or more paths or modules can be supplied as
|
|
arguments.
|
|
|
|
@returns: A dictionary of schema files. Keys are the base name of the
|
|
schema XML, values are lists of full paths to the XML. The first
|
|
filename in the list is what will load if the base name is used
|
|
with `loadSchema()`.
|
|
"""
|
|
schemata = {}
|
|
paths = paths or SCHEMA_PATH
|
|
|
|
for path in paths:
|
|
try:
|
|
fullpath = _expandSchemaPath(path)
|
|
except ModuleNotFoundError:
|
|
continue
|
|
|
|
if not fullpath.is_dir():
|
|
continue
|
|
|
|
for p in fullpath.iterdir():
|
|
key = p.name
|
|
if key.lower().endswith('.xml'):
|
|
try:
|
|
# Casting to string is py35 fix. Remove in future.
|
|
xml = ET.parse(str(p))
|
|
if xml.getroot().tag == 'Schema':
|
|
value = p if absolute else Path(path) / p.name
|
|
schemata.setdefault(key, []).append(value)
|
|
except (ET.ParseError, IOError, TypeError):
|
|
continue
|
|
|
|
return schemata
|
|
|
|
|
|
def loadSchema(filename, reload=False, paths=None, **kwargs):
|
|
""" Import a Schema XML file. Loading the same file more than once will
|
|
return the initial instantiation, unless `reload` is `True`.
|
|
|
|
@param filename: The name of the Schema XML file. If the file cannot
|
|
be found and file's path is not absolute, the paths listed in
|
|
`SCHEMA_PATH` will be searched (similar to `sys.path` when
|
|
importing modules).
|
|
@param reload: If `True`, the resulting Schema is guaranteed to be
|
|
new. Note: existing references to previous instances of the
|
|
Schema and/or its elements will not update.
|
|
@param paths: A list of paths to search for schemata, an alternative
|
|
to `ebmlite.SCHEMA_PATH`
|
|
|
|
Additional keyword arguments are sent verbatim to the `Schema`
|
|
constructor.
|
|
|
|
@raises: IOError, ModuleNotFoundError
|
|
"""
|
|
global SCHEMATA
|
|
|
|
paths = paths or SCHEMA_PATH
|
|
origName = str(filename)
|
|
filename = Path(filename)
|
|
|
|
if origName in SCHEMATA and not reload:
|
|
return SCHEMATA[origName]
|
|
|
|
filename = _expandSchemaPath(filename) # raises ModuleNotFoundError
|
|
|
|
if not filename.is_file():
|
|
if len(filename.parts) == 1:
|
|
# Not a specific path and file not found: search paths in SCHEMA_PATH
|
|
for p in paths:
|
|
try:
|
|
f = _expandSchemaPath(p, filename)
|
|
if f.is_file():
|
|
filename = f
|
|
break
|
|
except ModuleNotFoundError:
|
|
continue
|
|
|
|
if hasattr(filename, 'expanduser'):
|
|
filename = filename.expanduser().absolute()
|
|
|
|
if str(filename) in SCHEMATA and not reload:
|
|
return SCHEMATA[str(filename)]
|
|
|
|
if not filename.is_file():
|
|
raise IOError(errno.ENOENT, 'Could not find schema XML', origName)
|
|
|
|
with filename.open() as fs:
|
|
schema = Schema(fs, **kwargs)
|
|
|
|
SCHEMATA[str(filename)] = SCHEMATA[origName] = schema
|
|
return schema
|
|
|
|
|
|
def parseSchema(src, name=None, reload=False, **kwargs):
|
|
""" Read Schema XML data from a string or stream. Loading one with the
|
|
same `name` will return the initial instantiation, unless `reload`
|
|
is `True`. Calls to `loadSchema()` using a name previously used with
|
|
`parseSchema()` will also return the previously instantiated Schema.
|
|
|
|
@param src: The XML string, or a stream containing XML.
|
|
@param name: The name of the schema. If none is supplied,
|
|
the name defined within the schema will be used.
|
|
@param reload: If `True`, the resulting Schema is guaranteed to be
|
|
new. Note: existing references to previous instances of the
|
|
Schema and/or its elements will not update.
|
|
|
|
Additional keyword arguments are sent verbatim to the `Schema`
|
|
constructor.
|
|
"""
|
|
global SCHEMATA
|
|
|
|
if name in SCHEMATA and not reload:
|
|
return SCHEMATA[name]
|
|
|
|
if isinstance(src, IOBase):
|
|
stream = src
|
|
else:
|
|
stream = StringIO(src)
|
|
|
|
schema = Schema(stream, **kwargs)
|
|
name = name or schema.name
|
|
SCHEMATA[name] = schema
|
|
return schema
|