diff --git a/.gitignore b/.gitignore index ed10b9c..9d0a9c1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ __pycache__ /*.egg-info/ /.cache +/.pytest_cache /.coverage diff --git a/blender_asset_tracer/blendfile/__init__.py b/blender_asset_tracer/blendfile/__init__.py index 4f09351..2482e04 100644 --- a/blender_asset_tracer/blendfile/__init__.py +++ b/blender_asset_tracer/blendfile/__init__.py @@ -29,7 +29,7 @@ import pathlib import tempfile import typing -from . import exceptions, dna_io, dna +from . import exceptions, dna_io, dna, header log = logging.getLogger(__name__) @@ -94,7 +94,7 @@ class BlendFile: elif magic != BLENDFILE_MAGIC: raise exceptions.BlendFileError("File is not a blend file", path) - self.header = BlendFileHeader(self.fileobj, self.raw_filepath) + self.header = header.BlendFileHeader(self.fileobj, self.raw_filepath) self.block_header_struct = self.header.create_block_header_struct() self.blocks = [] self.code_index = collections.defaultdict(list) @@ -171,9 +171,10 @@ class BlendFile: DNACatalog is a catalog of all information in the DNA1 file-block """ self.log.debug("building DNA catalog") - shortstruct = self.header.types.USHORT - shortstruct2 = self.header.types.USHORT2 - intstruct = self.header.types.UINT + endian = self.header.endian + shortstruct = endian.USHORT + shortstruct2 = endian.USHORT2 + intstruct = endian.UINT assert intstruct.size == 4 data = self.fileobj.read(block.size) @@ -188,7 +189,7 @@ class BlendFile: self.log.debug("building #%d names" % names_len) for _ in range(names_len): - typename = dna_io.read_data0_offset(data, offset) + typename = endian.read_data0_offset(data, offset) offset = offset + len(typename) + 1 typenames.append(dna.Name(typename)) @@ -198,7 +199,7 @@ class BlendFile: offset += 4 self.log.debug("building #%d types" % types_len) for _ in range(types_len): - dna_type_id = dna_io.read_data0_offset(data, offset) + dna_type_id = endian.read_data0_offset(data, offset) types.append(dna.Struct(dna_type_id)) offset += len(dna_type_id) + 1 @@ -246,54 +247,6 @@ class BlendFile: return structs, sdna_index_from_id -class BlendFileHeader: - """ - BlendFileHeader represents the first 12 bytes of a blend file. - - it contains information about the hardware architecture, which is relevant - to the structure of the rest of the file. - """ - log = log.getChild('BlendFileHeader') - structure = struct.Struct(b'7s1s1s3s') - - def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path): - self.log.debug("reading blend-file-header %s", path) - header = fileobj.read(self.structure.size) - values = self.structure.unpack(header) - - self.magic = values[0] - - pointer_size_id = values[1] - if pointer_size_id == b'-': - self.pointer_size = 8 - elif pointer_size_id == b'_': - self.pointer_size = 4 - else: - raise exceptions.BlendFileError('invalid pointer size %r' % pointer_size_id, path) - - endian_id = values[2] - if endian_id == b'v': - self.types = dna_io.LittleEndianTypes - self.endian_str = b'<' # indication for struct.Struct() - elif endian_id == b'V': - self.types = dna_io.BigEndianTypes - self.endian_str = b'>' # indication for struct.Struct() - else: - raise exceptions.BlendFileError('invalid endian indicator %r' % endian_id, path) - - version_id = values[3] - self.version = int(version_id) - - def create_block_header_struct(self) -> struct.Struct: - """Create a Struct instance for parsing data block headers.""" - return struct.Struct(b''.join(( - self.endian_str, - b'4sI', - b'I' if self.pointer_size == 4 else b'Q', - b'II', - ))) - - class BlendFileBlock: """ Instance of a struct. @@ -421,7 +374,7 @@ class BlendFileBlock: return dna_struct.field_get( self.file.header, self.file.handle, path, default=default, - use_nil=use_nil, use_str=use_str, + nil_terminated=use_nil, as_str=use_str, ) def get_recursive_iter(self, path, path_root=b"", diff --git a/blender_asset_tracer/blendfile/dna.py b/blender_asset_tracer/blendfile/dna.py index e0c4796..b1dfefd 100644 --- a/blender_asset_tracer/blendfile/dna.py +++ b/blender_asset_tracer/blendfile/dna.py @@ -1,6 +1,12 @@ -import os import typing +import os + +from . import dna_io, header + +# Either a simple path b'propname', or a tuple (b'parentprop', b'actualprop', arrayindex) +FieldPath = typing.Union[bytes, typing.Iterable[typing.Union[bytes, int]]] + class Name: """dna.Name is a C-type name stored in the DNA as bytes.""" @@ -74,29 +80,52 @@ class Field: class Struct: """dna.Struct is a C-type structure stored in the DNA.""" - def __init__(self, dna_type_id: bytes): + def __init__(self, dna_type_id: bytes, size: int = None): + """ + :param dna_type_id: name of the struct in C, like b'AlembicObjectPath'. + :param size: only for unit tests; typically set after construction by + BlendFile.decode_structs(). If not set, it is calculated on the fly + when struct.size is evaluated, based on the available fields. + """ self.dna_type_id = dna_type_id + self._size = size self._fields = [] self._fields_by_name = {} def __repr__(self): return '%s(%r)' % (type(self).__qualname__, self.dna_type_id) + @property + def size(self) -> int: + if self._size is None: + if not self._fields: + raise ValueError('Unable to determine size of fieldless %r' % self) + last_field = max(self._fields, key=lambda f: f.offset) + self._size = last_field.offset + last_field.size + return self._size + + @size.setter + def size(self, new_size: int): + self._size = new_size + def append_field(self, field: Field): self._fields.append(field) self._fields_by_name[field.name.name_only] = field def field_from_path(self, pointer_size: int, - path: typing.Union[bytes, typing.Iterable[typing.Union[bytes, int]]]) \ - -> typing.Tuple[typing.Optional[Field], int]: + path: FieldPath) \ + -> typing.Tuple[Field, int]: """ Support lookups as bytes or a tuple of bytes and optional index. C style 'id.name' --> (b'id', b'name') C style 'array[4]' --> (b'array', 4) - :returns: the field itself, and its offset taking into account the optional index. + :returns: the field itself, and its offset taking into account the + optional index. The offset is relative to the start of the struct, + i.e. relative to the BlendFileBlock containing the data. + :raises KeyError: if the field does not exist. """ if isinstance(path, (tuple, list)): name = path[0] @@ -124,72 +153,77 @@ class Struct: return field.dna_type.field_from_path(pointer_size, name_tail) offset = field.offset - # fileobj.seek(field.offset, os.SEEK_CUR) if index: if field.name.is_pointer: index_offset = pointer_size * index else: index_offset = field.dna_type.size * index if index_offset >= field.size: - raise OverflowError('path %r is out of bounds of its DNA type' % path) - # fileobj.seek(index_offset, os.SEEK_CUR) + raise OverflowError('path %r is out of bounds of its DNA type %s' % + (path, field.dna_type)) offset += index_offset + return field, offset - def field_get(self, header, handle, path, + def field_get(self, + file_header: header.BlendFileHeader, + fileobj: typing.BinaryIO, + path: FieldPath, default=..., - use_nil=True, use_str=True, + nil_terminated=True, + as_str=True, ): - field = self.field_from_path(header, handle, path) - if field is None: - if default is not ...: - return default - else: - raise KeyError("%r not found in %r (%r)" % - ( - path, [f.dna_name.name_only for f in self._fields], - self.dna_type_id)) + """Read the value of the field from the blend file. + + Assumes the file pointer of `fileobj` is seek()ed to the start of the + struct on disk (e.g. the start of the BlendFileBlock containing the + data). + """ + try: + field, offset = self.field_from_path(file_header.pointer_size, path) + except KeyError: + if default is ...: + raise + return default + + fileobj.seek(offset, os.SEEK_CUR) dna_type = field.dna_type - dna_name = field.dna_name - dna_size = field.dna_size + dna_name = field.name + types = file_header.endian + # Some special cases (pointers, strings/bytes) if dna_name.is_pointer: - return DNA_IO.read_pointer(handle, header) - elif dna_type.dna_type_id == b'int': - if dna_name.array_size > 1: - return [DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)] - return DNA_IO.read_int(handle, header) - elif dna_type.dna_type_id == b'short': - if dna_name.array_size > 1: - return [DNA_IO.read_short(handle, header) for i in range(dna_name.array_size)] - return DNA_IO.read_short(handle, header) - elif dna_type.dna_type_id == b'uint64_t': - if dna_name.array_size > 1: - return [DNA_IO.read_ulong(handle, header) for i in range(dna_name.array_size)] - return DNA_IO.read_ulong(handle, header) - elif dna_type.dna_type_id == b'float': - if dna_name.array_size > 1: - return [DNA_IO.read_float(handle, header) for i in range(dna_name.array_size)] - return DNA_IO.read_float(handle, header) - elif dna_type.dna_type_id == b'char': - if dna_size == 1: + return types.read_pointer(fileobj, file_header.pointer_size) + if dna_type.dna_type_id == b'char': + if field.size == 1: # Single char, assume it's bitflag or int value, and not a string/bytes data... - return DNA_IO.read_char(handle, header) - if use_str: - if use_nil: - return DNA_IO.read_string0(handle, dna_name.array_size) - else: - return DNA_IO.read_string(handle, dna_name.array_size) + return types.read_char(fileobj) + if nil_terminated: + data = types.read_bytes0(fileobj, dna_name.array_size) else: - if use_nil: - return DNA_IO.read_bytes0(handle, dna_name.array_size) - else: - return DNA_IO.read_bytes(handle, dna_name.array_size) - else: + data = fileobj.read(dna_name.array_size) + + if as_str: + return data.decode('utf8') + return data + + simple_readers = { + b'int': types.read_int, + b'short': types.read_short, + b'uint64_t': types.read_ulong, + b'float': types.read_float, + } + try: + simple_reader = simple_readers[dna_type.dna_type_id] + except KeyError: raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" % (path, dna_name.name_only), dna_name, dna_type) + if dna_name.array_size > 1: + return [simple_reader(fileobj) for _ in range(dna_name.array_size)] + return simple_reader(fileobj) + def field_set(self, header, handle, path, value): assert (type(path) == bytes) diff --git a/blender_asset_tracer/blendfile/dna_io.py b/blender_asset_tracer/blendfile/dna_io.py index 36d7e77..78cde8b 100644 --- a/blender_asset_tracer/blendfile/dna_io.py +++ b/blender_asset_tracer/blendfile/dna_io.py @@ -5,7 +5,7 @@ import typing class EndianIO: - UCHAR = struct.Struct(b'= fieldlen: + to_write = data[0:fieldlen] + else: + to_write = data + b'\0' + + fileobj.write(to_write) + + @classmethod + def read_bytes0(cls, fileobj, length): + data = fileobj.read(length) + return cls.read_data0(data) + + @classmethod + def read_data0_offset(cls, data, offset): + add = data.find(b'\0', offset) - offset + return data[offset:offset + add] + + @classmethod + def read_data0(cls, data): + add = data.find(b'\0') + if add < 0: + return data + return data[:add] class LittleEndianTypes(EndianIO): @@ -66,7 +96,7 @@ class LittleEndianTypes(EndianIO): class BigEndianTypes(LittleEndianTypes): - UCHAR = struct.Struct(b'>b') + UCHAR = struct.Struct(b'>B') USHORT = struct.Struct(b'>H') USHORT2 = struct.Struct(b'>HH') # two shorts in a row SSHORT = struct.Struct(b'>h') @@ -74,41 +104,3 @@ class BigEndianTypes(LittleEndianTypes): SINT = struct.Struct(b'>i') FLOAT = struct.Struct(b'>f') ULONG = struct.Struct(b'>Q') - - -def write_string(fileobj: typing.BinaryIO, astring: str, fieldlen: int): - assert isinstance(astring, str) - write_bytes(fileobj, astring.encode('utf-8'), fieldlen) - - -def write_bytes(fileobj: typing.BinaryIO, data: bytes, fieldlen: int): - assert isinstance(data, (bytes, bytearray)) - if len(data) >= fieldlen: - to_write = data[0:fieldlen] - else: - to_write = data + b'\0' - - fileobj.write(to_write) - - -def read_bytes0(fileobj, length): - data = fileobj.read(length) - return read_data0(data) - - -def read_string(fileobj, length): - return fileobj.read(length).decode('utf-8') - - -def read_string0(fileobj, length): - return read_bytes0(fileobj, length).decode('utf-8') - - -def read_data0_offset(data, offset): - add = data.find(b'\0', offset) - offset - return data[offset:offset + add] - - -def read_data0(data): - add = data.find(b'\0') - return data[:add] diff --git a/blender_asset_tracer/blendfile/header.py b/blender_asset_tracer/blendfile/header.py new file mode 100644 index 0000000..b9a67b2 --- /dev/null +++ b/blender_asset_tracer/blendfile/header.py @@ -0,0 +1,55 @@ +import logging +import pathlib +import struct +import typing + +from . import dna_io, exceptions + +log = logging.getLogger(__name__) + + +class BlendFileHeader: + """ + BlendFileHeader represents the first 12 bytes of a blend file. + + it contains information about the hardware architecture, which is relevant + to the structure of the rest of the file. + """ + structure = struct.Struct(b'7s1s1s3s') + + def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path): + log.debug("reading blend-file-header %s", path) + header = fileobj.read(self.structure.size) + values = self.structure.unpack(header) + + self.magic = values[0] + + pointer_size_id = values[1] + if pointer_size_id == b'-': + self.pointer_size = 8 + elif pointer_size_id == b'_': + self.pointer_size = 4 + else: + raise exceptions.BlendFileError('invalid pointer size %r' % pointer_size_id, path) + + endian_id = values[2] + if endian_id == b'v': + self.endian = dna_io.LittleEndianTypes + self.endian_str = b'<' # indication for struct.Struct() + elif endian_id == b'V': + self.endian = dna_io.BigEndianTypes + self.endian_str = b'>' # indication for struct.Struct() + else: + raise exceptions.BlendFileError('invalid endian indicator %r' % endian_id, path) + + version_id = values[3] + self.version = int(version_id) + + def create_block_header_struct(self) -> struct.Struct: + """Create a Struct instance for parsing data block headers.""" + return struct.Struct(b''.join(( + self.endian_str, + b'4sI', + b'I' if self.pointer_size == 4 else b'Q', + b'II', + ))) diff --git a/tests/test_blendfile_dna.py b/tests/test_blendfile_dna.py index f1bc874..f848fbc 100644 --- a/tests/test_blendfile_dna.py +++ b/tests/test_blendfile_dna.py @@ -1,6 +1,9 @@ +import io +import os import unittest +from unittest import mock -from blender_asset_tracer.blendfile import dna +from blender_asset_tracer.blendfile import dna, dna_io class NameTest(unittest.TestCase): @@ -68,18 +71,161 @@ class NameTest(unittest.TestCase): class StructTest(unittest.TestCase): - def test_field_from_path(self): + class FakeHeader: + pointer_size = 8 + endian = dna_io.BigEndianTypes + + def setUp(self): + self.s = dna.Struct(b'AlembicObjectPath') + self.s_char = dna.Struct(b'char', 1) + self.s_float = dna.Struct(b'float', 4) + self.s_uint64 = dna.Struct(b'uint64_t', 8) + self.s_uint128 = dna.Struct(b'uint128_t', 16) # non-supported type + + self.f_next = dna.Field(self.s, dna.Name(b'*next'), 8, 0) + self.f_prev = dna.Field(self.s, dna.Name(b'*prev'), 8, 8) + self.f_path = dna.Field(self.s_char, dna.Name(b'path[4096]'), 4096, 16) + self.f_pointer = dna.Field(self.s_char, dna.Name(b'*ptr'), 3 * 8, 4112) + self.f_number = dna.Field(self.s_uint64, dna.Name(b'numbah'), 8, 4136) + self.f_floaty = dna.Field(self.s_float, dna.Name(b'floaty[2]'), 2 * 4, 4144) + self.f_flag = dna.Field(self.s_char, dna.Name(b'bitflag'), 1, 4152) + self.f_bignum = dna.Field(self.s_uint128, dna.Name(b'bignum'), 16, 4153) + + self.s.append_field(self.f_next) + self.s.append_field(self.f_prev) + self.s.append_field(self.f_path) + self.s.append_field(self.f_pointer) + self.s.append_field(self.f_number) + self.s.append_field(self.f_floaty) + self.s.append_field(self.f_flag) + self.s.append_field(self.f_bignum) + + def test_autosize(self): + with self.assertRaises(ValueError): + # Maybe it would be better to just return 0 on empty structs. + # They are actually used in Blendfiles (for example + # AbcArchiveHandle), but when actually loading from a blendfile + # the size property is explicitly set anyway. The situation we + # test here is for manually created Struct instances that don't + # have any fields. + dna.Struct(b'EmptyStruct').size + + # Create AlebicObjectPath as it is actually used in Blender 2.79a s = dna.Struct(b'AlembicObjectPath') f_next = dna.Field(s, dna.Name(b'*next'), 8, 0) f_prev = dna.Field(s, dna.Name(b'*prev'), 8, 8) - f_path = dna.Field(dna.Struct(b'char'), dna.Name(b'path[4096]'), 4096, 16) - f_pointer = dna.Field(dna.Struct(b'char'), dna.Name(b'*ptr'), 3 * 8, 16 + 4096) + f_path = dna.Field(self.s_char, dna.Name(b'path[4096]'), 4096, 16) s.append_field(f_next) s.append_field(f_prev) s.append_field(f_path) - s.append_field(f_pointer) + self.assertEqual(s.size, 4112) + + def test_field_from_path(self): psize = 8 - self.assertEqual(s.field_from_path(psize, b'path'), (f_path, 16)) - self.assertEqual(s.field_from_path(psize, (b'prev', b'path')), (f_path, 16)) - self.assertEqual(s.field_from_path(psize, (b'ptr', 2)), (f_pointer, 16 + 4096 + 2 * psize)) + self.assertEqual(self.s.field_from_path(psize, b'path'), + (self.f_path, 16)) + self.assertEqual(self.s.field_from_path(psize, (b'prev', b'path')), + (self.f_path, 16)) + self.assertEqual(self.s.field_from_path(psize, (b'ptr', 2)), + (self.f_pointer, 16 + 4096 + 2 * psize)) + self.assertEqual(self.s.field_from_path(psize, (b'floaty', 1)), + (self.f_floaty, 4144 + self.s_float.size)) + + with self.assertRaises(OverflowError): + self.s.field_from_path(psize, (b'floaty', 2)) + + with self.assertRaises(KeyError): + self.s.field_from_path(psize, b'non-existant') + + with self.assertRaises(TypeError): + self.s.field_from_path(psize, 'path') + + def test_simple_field_get(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa' + val = self.s.field_get(self.FakeHeader(), fileobj, b'numbah') + + self.assertEqual(val, 0x1020304fffefdfa) + fileobj.seek.assert_called_with(4136, os.SEEK_CUR) + + def test_field_get_default(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.side_effect = RuntimeError + val = self.s.field_get(self.FakeHeader(), fileobj, b'nonexistant', default=519871531) + + self.assertEqual(val, 519871531) + fileobj.seek.assert_not_called() + + def test_field_get_nonexistant(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.side_effect = RuntimeError + + with self.assertRaises(KeyError): + self.s.field_get(self.FakeHeader(), fileobj, b'nonexistant') + fileobj.seek.assert_not_called() + + def test_field_get_unsupported_type(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.side_effect = RuntimeError + + with self.assertRaises(NotImplementedError): + self.s.field_get(self.FakeHeader(), fileobj, b'bignum') + fileobj.seek.assert_called_with(4153, os.SEEK_CUR) + + def test_pointer_field_get(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\xf0\x9f\xa6\x87\x00dum' + val = self.s.field_get(self.FakeHeader(), fileobj, b'ptr') + + self.assertEqual(0xf09fa6870064756d, val) + fileobj.seek.assert_called_with(4112, os.SEEK_CUR) + + def test_string_field_get(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\xf0\x9f\xa6\x87\x00dummydata' + val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=True) + + self.assertEqual('🦇', val) + fileobj.seek.assert_called_with(16, os.SEEK_CUR) + + def test_string_field_get_single_char(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\xf0' + val = self.s.field_get(self.FakeHeader(), fileobj, b'bitflag') + + self.assertEqual(0xf0, val) + fileobj.seek.assert_called_with(4152, os.SEEK_CUR) + + def test_string_field_get_invalid_utf8(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa' + + with self.assertRaises(UnicodeDecodeError): + self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=True) + + def test_string_field_get_bytes_null_terminated(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata' + + val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=False) + self.assertEqual(b'\x01\x02\x03\x04\xff\xfe\xfd\xfa', val) + fileobj.seek.assert_called_with(16, os.SEEK_CUR) + + def test_string_field_get_bytes(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata' + + val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=False, + nil_terminated=False) + self.assertEqual(b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata', val) + fileobj.seek.assert_called_with(16, os.SEEK_CUR) + + def test_string_field_get_float_array(self): + fileobj = mock.MagicMock(io.BufferedReader) + fileobj.read.side_effect = (b'@333', b'@2\x8f\\') + + val = self.s.field_get(self.FakeHeader(), fileobj, b'floaty') + self.assertAlmostEqual(2.8, val[0]) + self.assertAlmostEqual(2.79, val[1]) + fileobj.seek.assert_called_with(4144, os.SEEK_CUR)