You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
7.1 KiB
242 lines
7.1 KiB
from ...exceptions import ReadError, SizeError
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from struct import unpack
|
|
|
|
|
|
__all__ = [
|
|
"read_element_id",
|
|
"read_element_size",
|
|
"read_element_integer",
|
|
"read_element_uinteger",
|
|
"read_element_float",
|
|
"read_element_string",
|
|
"read_element_unicode",
|
|
"read_element_date",
|
|
"read_element_binary",
|
|
]
|
|
|
|
|
|
def _read(stream, size):
|
|
"""Read the `stream` for *exactly* `size` bytes and raise an exception if
|
|
less than `size` bytes are actually read
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: number of bytes to read
|
|
:raise ReadError: when less than `size` bytes are actually read
|
|
:return: read data from the `stream`
|
|
:rtype: bytes
|
|
|
|
"""
|
|
data = stream.read(size)
|
|
if len(data) < size:
|
|
raise ReadError("Less than %d bytes read (%d)" % (size, len(data)))
|
|
return data
|
|
|
|
|
|
def read_element_id(stream):
|
|
"""Read the Element ID
|
|
|
|
:param stream: file-like object from which to read
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:return: the id of the element
|
|
:rtype: int
|
|
|
|
"""
|
|
char = _read(stream, 1)
|
|
byte = ord(char)
|
|
if byte & 0x80:
|
|
return byte
|
|
elif byte & 0x40:
|
|
return unpack(">H", char + _read(stream, 1))[0]
|
|
elif byte & 0x20:
|
|
b, h = unpack(">BH", char + _read(stream, 2))
|
|
return b * 2**16 + h
|
|
elif byte & 0x10:
|
|
return unpack(">L", char + _read(stream, 3))[0]
|
|
else:
|
|
ValueError("Not an Element ID")
|
|
|
|
|
|
def read_element_size(stream):
|
|
"""Read the Element Size
|
|
|
|
:param stream: file-like object from which to read
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:return: the size of element's data
|
|
:rtype: int
|
|
|
|
"""
|
|
char = _read(stream, 1)
|
|
byte = ord(char)
|
|
if byte & 0x80:
|
|
return unpack(">B", bytes((byte ^ 0x80,)))[0]
|
|
elif byte & 0x40:
|
|
return unpack(">H", bytes((byte ^ 0x40,)) + _read(stream, 1))[0]
|
|
elif byte & 0x20:
|
|
b, h = unpack(">BH", bytes((byte ^ 0x20,)) + _read(stream, 2))
|
|
return b * 2**16 + h
|
|
elif byte & 0x10:
|
|
return unpack(">L", bytes((byte ^ 0x10,)) + _read(stream, 3))[0]
|
|
elif byte & 0x08:
|
|
b, l = unpack(">BL", bytes((byte ^ 0x08,)) + _read(stream, 4))
|
|
return b * 2**32 + l
|
|
elif byte & 0x04:
|
|
h, l = unpack(">HL", bytes((byte ^ 0x04,)) + _read(stream, 5))
|
|
return h * 2**32 + l
|
|
elif byte & 0x02:
|
|
b, h, l = unpack(">BHL", bytes((byte ^ 0x02,)) + _read(stream, 6))
|
|
return b * 2**48 + h * 2**32 + l
|
|
elif byte & 0x01:
|
|
return unpack(">Q", bytes((byte ^ 0x01,)) + _read(stream, 7))[0]
|
|
else:
|
|
ValueError("Not an Element Size")
|
|
|
|
|
|
def read_element_integer(stream, size):
|
|
"""Read the Element Data of type :data:`INTEGER`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read integer
|
|
:rtype: int
|
|
|
|
"""
|
|
if size == 1:
|
|
return unpack(">b", _read(stream, 1))[0]
|
|
elif size == 2:
|
|
return unpack(">h", _read(stream, 2))[0]
|
|
elif size == 3:
|
|
b, h = unpack(">bH", _read(stream, 3))
|
|
return b * 2**16 + h
|
|
elif size == 4:
|
|
return unpack(">l", _read(stream, 4))[0]
|
|
elif size == 5:
|
|
b, l = unpack(">bL", _read(stream, 5))
|
|
return b * 2**32 + l
|
|
elif size == 6:
|
|
h, l = unpack(">hL", _read(stream, 6))
|
|
return h * 2**32 + l
|
|
elif size == 7:
|
|
b, h, l = unpack(">bHL", _read(stream, 7))
|
|
return b * 2**48 + h * 2**32 + l
|
|
elif size == 8:
|
|
return unpack(">q", _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_uinteger(stream, size):
|
|
"""Read the Element Data of type :data:`UINTEGER`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read unsigned integer
|
|
:rtype: int
|
|
|
|
"""
|
|
if size == 1:
|
|
return unpack(">B", _read(stream, 1))[0]
|
|
elif size == 2:
|
|
return unpack(">H", _read(stream, 2))[0]
|
|
elif size == 3:
|
|
b, h = unpack(">BH", _read(stream, 3))
|
|
return b * 2**16 + h
|
|
elif size == 4:
|
|
return unpack(">L", _read(stream, 4))[0]
|
|
elif size == 5:
|
|
b, l = unpack(">BL", _read(stream, 5))
|
|
return b * 2**32 + l
|
|
elif size == 6:
|
|
h, l = unpack(">HL", _read(stream, 6))
|
|
return h * 2**32 + l
|
|
elif size == 7:
|
|
b, h, l = unpack(">BHL", _read(stream, 7))
|
|
return b * 2**48 + h * 2**32 + l
|
|
elif size == 8:
|
|
return unpack(">Q", _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_float(stream, size):
|
|
"""Read the Element Data of type :data:`FLOAT`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read float
|
|
:rtype: float
|
|
|
|
"""
|
|
if size == 4:
|
|
return unpack(">f", _read(stream, 4))[0]
|
|
elif size == 8:
|
|
return unpack(">d", _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_string(stream, size):
|
|
"""Read the Element Data of type :data:`STRING`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read ascii-decoded string
|
|
:rtype: unicode
|
|
|
|
"""
|
|
return _read(stream, size).decode("ascii")
|
|
|
|
|
|
def read_element_unicode(stream, size):
|
|
"""Read the Element Data of type :data:`UNICODE`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read utf-8-decoded string
|
|
:rtype: unicode
|
|
|
|
"""
|
|
return _read(stream, size).decode("utf-8")
|
|
|
|
|
|
def read_element_date(stream, size):
|
|
"""Read the Element Data of type :data:`DATE`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read date
|
|
:rtype: datetime
|
|
|
|
"""
|
|
if size != 8:
|
|
raise SizeError(size)
|
|
nanoseconds = unpack(">q", _read(stream, 8))[0]
|
|
return datetime(2001, 1, 1, 0, 0, 0, 0, None) + timedelta(microseconds=nanoseconds // 1000)
|
|
|
|
|
|
def read_element_binary(stream, size):
|
|
"""Read the Element Data of type :data:`BINARY`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: raw binary data
|
|
:rtype: bytes
|
|
|
|
"""
|
|
return BytesIO(stream.read(size))
|