476 lines
18 KiB
Python
476 lines
18 KiB
Python
"""
|
|
Some utilities for manipulating EBML documents: translate to/from XML, etc.
|
|
This module may be imported or used as a command-line utility.
|
|
|
|
Created on Aug 11, 2017
|
|
|
|
@todo: Clean up and standardize usage of the term 'size' versus 'length.'
|
|
@todo: Modify (or create an alternate version of) `toXml()` that writes
|
|
directly to a file, allowing the conversion of huge EBML files.
|
|
@todo: Add other options to command-line utility for the other arguments of
|
|
`toXml()` and `xml2ebml()`.
|
|
"""
|
|
__author__ = "David Randall Stokes, Connor Flanigan"
|
|
__copyright__ = "Copyright 2021, Mide Technology Corporation"
|
|
__credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt"
|
|
|
|
__all__ = ['createID', 'validateID', 'toXml', 'xml2ebml', 'loadXml', 'pprint',
|
|
'printSchemata']
|
|
|
|
import ast
|
|
from base64 import b64encode, b64decode
|
|
from io import StringIO
|
|
import pathlib
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from . import core, encoding, decoding
|
|
from . import xml_codecs
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
|
|
def createID(schema, idClass, exclude=(), minId=0x81, maxId=0x1FFFFFFE, count=1):
|
|
""" Generate unique EBML IDs. Primarily intended for use 'offline' by
|
|
humans creating EBML schemata.
|
|
|
|
@param schema: The `Schema` in which the new IDs must coexist.
|
|
@param idClass: The EBML class of ID, one of (case-insensitive):
|
|
* `'a'`: Class A (1 octet, base 0x8X)
|
|
* `'b'`: Class B (2 octets, base 0x4000)
|
|
* `'c'`: Class C (3 octets, base 0x200000)
|
|
* `'d'`: Class D (4 octets, base 0x10000000)
|
|
@param exclude: A list of additional IDs to avoid.
|
|
@param minId: The minimum ID value, within the ID class' range.
|
|
@param maxId: The maximum ID value, within the ID class' range.
|
|
@param count: The maximum number of IDs to generate. The result may be
|
|
fewer than specified if too few meet the given criteria.
|
|
@return: A list of EBML IDs that match the given criteria.
|
|
"""
|
|
ranges = dict(A=(0x81, 0xFE),
|
|
B=(0x407F, 0x7FFE),
|
|
C=(0x203FFF, 0x3FFFFE),
|
|
D=(0x101FFFFF, 0x1FFFFFFE))
|
|
idc = idClass.upper()
|
|
if idc not in ranges:
|
|
raise KeyError('Invalid ID class %r: must be one of %r' %
|
|
(idClass, list(ranges)))
|
|
|
|
# Keep range within the one specified and the one imposed by the ID class
|
|
idrange = (max(ranges[idc][0], minId),
|
|
min(ranges[idc][1], maxId))
|
|
|
|
exclude = set(exclude).union(schema.elements.keys())
|
|
|
|
result = []
|
|
for i in (x for x in range(*idrange) if x not in exclude):
|
|
if len(result) == count:
|
|
break
|
|
result.append(i)
|
|
|
|
return result
|
|
|
|
|
|
def validateID(elementId):
|
|
""" Verify that a number is a valid EBML element ID. A `ValueError`
|
|
will be raised if the element ID is invalid.
|
|
|
|
Valid ranges for the four classes of EBML ID are:
|
|
* A: 0x81 to 0xFE
|
|
* B: 0x407F to 0x7FFE
|
|
* C: 0x203FFF to 0x3FFFFE
|
|
* D: 0x101FFFFF to 0x1FFFFFFE
|
|
|
|
@param elementId: The element ID to validate
|
|
@raises: `ValueError`, although certain edge cases may raise
|
|
another type.
|
|
"""
|
|
ranges = ((0x81, 0xFE), (0x407F, 0x7FFE), (0x203FFF, 0x3FFFFE), (0x101FFFFF, 0x1FFFFFFE))
|
|
|
|
msg = "Invalid element ID" # Default error message
|
|
|
|
# Basic check: is the ID within the bounds of the total ID range?
|
|
if not 0x81 <= elementId <= 0x1FFFFFFE:
|
|
raise ValueError("Element ID out of range", elementId)
|
|
|
|
try:
|
|
# See if the first byte properly encodes the length of the ID.
|
|
s = struct.pack(">I", elementId).lstrip(b'\x00')
|
|
length, _ = decoding.decodeIDLength(s[0])
|
|
valid = len(s) == length # Should always be True if decoding worked
|
|
if valid:
|
|
minId, maxId = ranges[length-1]
|
|
if not minId <= elementId <= maxId:
|
|
msg = "ID out of range for class %s %s" % (" ABCD"[length], ranges[length-1])
|
|
valid = False
|
|
|
|
# Note: Change this if decoding changes the exceptions it raises
|
|
except OSError as err:
|
|
valid = False
|
|
msg = err.args[0] if err.args else msg
|
|
|
|
if not valid:
|
|
raise ValueError(msg, elementId)
|
|
|
|
return True
|
|
|
|
# ==============================================================================
|
|
#
|
|
# ==============================================================================
|
|
|
|
|
|
def toXml(el, parent=None, offsets=True, sizes=True, types=True, ids=True,
|
|
binary_codec='base64', void_codec='ignore'):
|
|
""" Convert an EBML Document to XML. Binary elements will contain
|
|
base64-encoded data in their body. Other non-master elements will
|
|
contain their value in a ``value`` attribute.
|
|
|
|
@param el: An instance of an EBML Element or Document subclass.
|
|
@keyword parent: The resulting XML element's parent element, if any.
|
|
@keyword offsets: If `True`, create a ``offset`` attributes for each
|
|
generated XML element, containing the corresponding EBML element's
|
|
offset.
|
|
@keyword sizes: If `True`, create ``size`` attributes containing the
|
|
corresponding EBML element's size.
|
|
@keyword types: If `True`, create ``type`` attributes containing the
|
|
name of the corresponding EBML element type.
|
|
@keyword ids: If `True`, create ``id`` attributes containing the
|
|
corresponding EBML element's EBML ID.
|
|
@keyword binary_codec: The name of an XML codec class from
|
|
`ebmlite.xml_codecs`, or an instance of a codec, for rendering
|
|
binary elements as text.
|
|
@keyword void_codec: The name of an XML codec class from
|
|
`ebmlite.xml_codecs`, or an instance of a codec, for rendering
|
|
the contents of Void elements as text.
|
|
@return The root XML element of the file.
|
|
"""
|
|
if isinstance(binary_codec, str):
|
|
binary_codec = xml_codecs.BINARY_CODECS[binary_codec]()
|
|
if isinstance(void_codec, str):
|
|
void_codec = xml_codecs.BINARY_CODECS[void_codec]()
|
|
|
|
if isinstance(el, core.Document):
|
|
elname = el.__class__.__name__
|
|
else:
|
|
elname = el.name
|
|
|
|
if parent is None:
|
|
xmlEl = ET.Element(elname)
|
|
else:
|
|
xmlEl = ET.SubElement(parent, elname)
|
|
if isinstance(el, core.Document):
|
|
xmlEl.set('source', el.filename)
|
|
xmlEl.set('schemaName', el.schema.name)
|
|
xmlEl.set('schemaFile', el.schema.filename)
|
|
else:
|
|
if ids and isinstance(el.id, int):
|
|
xmlEl.set('id', "0x%X" % el.id)
|
|
if types:
|
|
xmlEl.set('type', el.dtype.__name__)
|
|
|
|
if offsets:
|
|
xmlEl.set('offset', str(el.offset))
|
|
if sizes:
|
|
xmlEl.set('size', str(el.size))
|
|
|
|
if isinstance(el, core.MasterElement):
|
|
for chEl in el:
|
|
toXml(chEl, xmlEl, offsets, sizes, types, ids, binary_codec, void_codec)
|
|
elif isinstance(el, core.VoidElement):
|
|
xmlEl.set('size', str(el.size))
|
|
if void_codec.NAME != 'ignore':
|
|
xmlEl.set('encoding', void_codec.NAME)
|
|
xmlEl.text = void_codec.encode(el.value)
|
|
elif isinstance(el, core.BinaryElement):
|
|
xmlEl.set('encoding', binary_codec.NAME)
|
|
xmlEl.text = binary_codec.encode(el.value, offset=el.offset)
|
|
elif not isinstance(el, core.VoidElement):
|
|
xmlEl.set('value', str(el.value).encode('ascii', 'xmlcharrefreplace').decode())
|
|
|
|
return xmlEl
|
|
|
|
|
|
#===============================================================================
|
|
#
|
|
#===============================================================================
|
|
|
|
def xmlElement2ebml(xmlEl, ebmlFile, schema, sizeLength=None, unknown=True):
|
|
""" Convert an XML element to EBML, recursing if necessary. For converting
|
|
an entire XML document, use `xml2ebml()`.
|
|
|
|
@param xmlEl: The XML element. Its tag must match an element defined
|
|
in the `schema`.
|
|
@param ebmlFile: An open file-like stream, to which the EBML data will
|
|
be written.
|
|
@param schema: An `ebmlite.core.Schema` instance to use when
|
|
writing the EBML document.
|
|
@keyword sizeLength:
|
|
@param unknown: If `True`, unknown element names will be allowed,
|
|
provided their XML elements include an ``id`` attribute with the
|
|
EBML ID (in hexadecimal).
|
|
@return The length of the encoded element, including header and children.
|
|
@raise NameError: raised if an xml element is not present in the schema and unknown is False, OR if the xml
|
|
element does not have an ID.
|
|
"""
|
|
if not isinstance(xmlEl.tag, (str, bytes, bytearray)):
|
|
# (Probably) a comment; disregard.
|
|
return 0
|
|
|
|
try:
|
|
cls = schema[xmlEl.tag]
|
|
encId = encoding.encodeId(cls.id)
|
|
except (KeyError, AttributeError):
|
|
# Element name not in schema. Go ahead if allowed (`unknown` is `True`)
|
|
# and the XML element specifies an ID,
|
|
if not unknown:
|
|
raise NameError("Unrecognized EBML element name: %s" % xmlEl.tag)
|
|
|
|
eid = xmlEl.get('id', None)
|
|
if eid is None:
|
|
raise NameError("Unrecognized EBML element name with no 'id' "
|
|
"attribute in XML: %s" % xmlEl.tag)
|
|
cls = core.UnknownElement
|
|
encId = encoding.encodeId(int(eid, 16))
|
|
cls.id = int(eid, 16)
|
|
|
|
codec = xmlEl.get('encoding', 'base64')
|
|
|
|
if sizeLength is None:
|
|
sl = xmlEl.get('sizeLength', None)
|
|
if sl is None:
|
|
s = xmlEl.get('size', None)
|
|
if s is not None:
|
|
sl = encoding.getLength(int(s))
|
|
else:
|
|
sl = 4
|
|
else:
|
|
sl = int(sl)
|
|
else:
|
|
sl = xmlEl.get('sizeLength', sizeLength)
|
|
|
|
if issubclass(cls, core.MasterElement):
|
|
ebmlFile.write(encId)
|
|
sizePos = ebmlFile.tell()
|
|
ebmlFile.write(encoding.encodeSize(None, sl))
|
|
size = 0
|
|
for chEl in xmlEl:
|
|
size += xmlElement2ebml(chEl, ebmlFile, schema, sl)
|
|
endPos = ebmlFile.tell()
|
|
ebmlFile.seek(sizePos)
|
|
ebmlFile.write(encoding.encodeSize(size, sl))
|
|
ebmlFile.seek(endPos)
|
|
return len(encId) + (endPos - sizePos)
|
|
|
|
elif issubclass(cls, core.BinaryElement):
|
|
val = xml_codecs.BINARY_CODECS[codec].decode(xmlEl.text)
|
|
elif issubclass(cls, (core.IntegerElement, core.FloatElement)):
|
|
val = ast.literal_eval(xmlEl.get('value'))
|
|
else:
|
|
val = cls.dtype(xmlEl.get('value'))
|
|
|
|
size = xmlEl.get('size', None)
|
|
if size is not None:
|
|
size = int(size)
|
|
sl = xmlEl.get('sizeLength')
|
|
if sl is not None:
|
|
sl = int(sl)
|
|
|
|
encoded = cls.encode(val, size, lengthSize=sl)
|
|
ebmlFile.write(encoded)
|
|
return len(encoded)
|
|
|
|
|
|
def xml2ebml(xmlFile, ebmlFile, schema, sizeLength=None, headers=True,
|
|
unknown=True):
|
|
""" Convert an XML file to EBML.
|
|
|
|
@todo: Convert XML on the fly, rather than parsing it first, allowing
|
|
for the conversion of arbitrarily huge files.
|
|
|
|
@param xmlFile: The XML source. Can be a filename, an open file-like
|
|
stream, or a parsed XML document.
|
|
@param ebmlFile: The EBML file to write. Can be a filename or an open
|
|
file-like stream.
|
|
@param schema: The EBML schema to use. Can be a filename or an
|
|
instance of a `Schema`.
|
|
@keyword sizeLength: The default length of each element's size
|
|
descriptor. Must be large enough to store the largest 'master'
|
|
element. If an XML element has a ``sizeLength`` attribute, it will
|
|
override this.
|
|
@keyword headers: If `True`, generate the standard ``EBML`` EBML
|
|
element if the XML document does not contain one.
|
|
@param unknown: If `True`, unknown element names will be allowed,
|
|
provided their XML elements include an ``id`` attribute with the
|
|
EBML ID (in hexadecimal).
|
|
@return: the size of the ebml file in bytes.
|
|
@raise NameError: raises if an xml element is not present in the schema.
|
|
"""
|
|
if isinstance(ebmlFile, (str, bytes, bytearray)):
|
|
ebmlFile = open(ebmlFile, 'wb')
|
|
openedEbml = True
|
|
else:
|
|
openedEbml = False
|
|
|
|
if not isinstance(schema, core.Schema):
|
|
schema = core.loadSchema(schema)
|
|
|
|
if isinstance(xmlFile, ET.Element):
|
|
# Already a parsed XML element
|
|
xmlRoot = xmlFile
|
|
elif isinstance(xmlFile, ET.ElementTree):
|
|
# Already a parsed XML document
|
|
xmlRoot = xmlFile.getroot()
|
|
else:
|
|
xmlDoc = ET.parse(xmlFile)
|
|
xmlRoot = xmlDoc.getroot()
|
|
|
|
if xmlRoot.tag not in schema and xmlRoot.tag != schema.document.__name__:
|
|
raise NameError("XML element %s not an element or document in "
|
|
"schema %s (wrong schema)" % (xmlRoot.tag, schema.name))
|
|
|
|
headers = headers and 'EBML' in schema
|
|
if headers and 'EBML' not in (el.tag for el in xmlRoot):
|
|
pos = ebmlFile.tell()
|
|
cls = schema.document
|
|
ebmlFile.write(cls.encodePayload(cls._createHeaders()))
|
|
numBytes = ebmlFile.tell() - pos
|
|
else:
|
|
numBytes = 0
|
|
|
|
if xmlRoot.tag == schema.document.__name__:
|
|
for el in xmlRoot:
|
|
numBytes += xmlElement2ebml(el, ebmlFile, schema, sizeLength,
|
|
unknown=unknown)
|
|
else:
|
|
numBytes += xmlElement2ebml(xmlRoot, ebmlFile, schema, sizeLength,
|
|
unknown=unknown)
|
|
|
|
if openedEbml:
|
|
ebmlFile.close()
|
|
|
|
return numBytes
|
|
|
|
#===============================================================================
|
|
#
|
|
#===============================================================================
|
|
|
|
|
|
def loadXml(xmlFile, schema, ebmlFile=None):
|
|
""" Helpful utility to load an EBML document from an XML file.
|
|
|
|
@param xmlFile: The XML source. Can be a filename, an open file-like
|
|
stream, or a parsed XML document.
|
|
@param schema: The EBML schema to use. Can be a filename or an
|
|
instance of a `Schema`.
|
|
@keyword ebmlFile: The name of the temporary EBML file to write, or
|
|
``:memory:`` to use RAM (like `sqlite3`). Defaults to an
|
|
automatically-generated temporary file.
|
|
@return The root node of the specified EBML file.
|
|
"""
|
|
if ebmlFile == ":memory:":
|
|
ebmlFile = StringIO()
|
|
xml2ebml(xmlFile, ebmlFile, schema)
|
|
ebmlFile.seek(0)
|
|
else:
|
|
ebmlFile = tempfile.mktemp() if ebmlFile is None else ebmlFile
|
|
xml2ebml(xmlFile, ebmlFile, schema)
|
|
|
|
return schema.load(ebmlFile)
|
|
|
|
|
|
#===============================================================================
|
|
#
|
|
#===============================================================================
|
|
|
|
def pprint(el, values=True, out=sys.stdout, indent=" ", binary_codec="ignore",
|
|
void_codec="ignore", _depth=0):
|
|
""" Test function to recursively crawl an EBML document or element and
|
|
print its structure, with child elements shown indented.
|
|
|
|
@param el: An instance of a `Document` or `Element` subclass.
|
|
@keyword values: If `True`, show elements' values.
|
|
@keyword out: A file-like stream to which to write.
|
|
@keyword indent: The string containing the character(s) used for each
|
|
indentation.
|
|
@keyword binary_codec: The name of a class from `ebmlite.xml_codecs`,
|
|
or an instance of a codec, for rendering binary elements as text.
|
|
@keyword void_codec: The name of a class from `ebmlite.xml_codecs`,
|
|
or an instance of a codec, for rendering the contents of Void
|
|
elements as text.
|
|
"""
|
|
tab = indent * _depth
|
|
|
|
if isinstance(binary_codec, str):
|
|
binary_codec = xml_codecs.BINARY_CODECS[binary_codec]()
|
|
if isinstance(void_codec, str):
|
|
void_codec = xml_codecs.BINARY_CODECS[void_codec]()
|
|
|
|
if _depth == 0:
|
|
if values:
|
|
out.write("Offset Size Element (ID): Value\n")
|
|
else:
|
|
out.write("Offset Size Element (ID)\n")
|
|
out.write("====== ====== =================================\n")
|
|
|
|
if isinstance(el, core.Document):
|
|
out.write("%06s %06s %s %s (Document, type %s)\n" % (el.offset, el.size, tab, el.name, el.type))
|
|
for i in el:
|
|
pprint(i, values, out, indent, binary_codec, void_codec, _depth+1)
|
|
else:
|
|
out.write("%06s %06s %s %s (ID 0x%0X)" % (el.offset, el.size, tab, el.name, el.id))
|
|
if isinstance(el, core.MasterElement):
|
|
out.write(": (master) %d subelements\n" % len(el.value))
|
|
for i in el:
|
|
pprint(i, values, out, indent, binary_codec, void_codec, _depth+1)
|
|
else:
|
|
out.write(": (%s)" % el.dtype.__name__)
|
|
if values:
|
|
if isinstance(el, core.BinaryElement):
|
|
indent = tab + " " * 17
|
|
if isinstance(el, core.VoidElement) and void_codec.NAME != 'ignore':
|
|
out.write(" <{}>".format(void_codec.NAME))
|
|
void_codec.encode(el.value, offset=el.offset, indent=indent, stream=out)
|
|
elif binary_codec.NAME != 'ignore':
|
|
out.write(" <{}>".format(binary_codec.NAME))
|
|
binary_codec.encode(el.value, offset=el.offset, indent=indent, stream=out)
|
|
else:
|
|
out.write(" %r" % (el.value))
|
|
out.write("\n")
|
|
|
|
out.flush()
|
|
|
|
|
|
#===============================================================================
|
|
#
|
|
#===============================================================================
|
|
|
|
def printSchemata(paths=None, out=sys.stdout, absolute=True):
|
|
""" Display a list of schemata in `SCHEMA_PATH`. A thin wrapper for the
|
|
core `listSchemata()` function.
|
|
|
|
@param out: A file-like stream to which to write.
|
|
"""
|
|
out = out or sys.stdout
|
|
newfile = isinstance(out, (str, pathlib.Path))
|
|
if newfile:
|
|
out = open(out, 'w')
|
|
|
|
try:
|
|
if paths:
|
|
paths.extend(core.SCHEMA_PATH)
|
|
else:
|
|
paths = core.SCHEMA_PATH
|
|
schemata = core.listSchemata(*paths, absolute=absolute)
|
|
for k, v in schemata.items():
|
|
out.write("{}\n".format(k))
|
|
for s in v:
|
|
out.write(" {}\n".format(s))
|
|
out.flush()
|
|
finally:
|
|
if newfile:
|
|
out.close()
|