Implemented + tested dna.Struct.field_get
This commit is contained in:
parent
03bf2ac69f
commit
e66b1edaf4
1
.gitignore
vendored
1
.gitignore
vendored
@ -2,4 +2,5 @@
|
|||||||
__pycache__
|
__pycache__
|
||||||
/*.egg-info/
|
/*.egg-info/
|
||||||
/.cache
|
/.cache
|
||||||
|
/.pytest_cache
|
||||||
/.coverage
|
/.coverage
|
||||||
|
|||||||
@ -29,7 +29,7 @@ import pathlib
|
|||||||
import tempfile
|
import tempfile
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from . import exceptions, dna_io, dna
|
from . import exceptions, dna_io, dna, header
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -94,7 +94,7 @@ class BlendFile:
|
|||||||
elif magic != BLENDFILE_MAGIC:
|
elif magic != BLENDFILE_MAGIC:
|
||||||
raise exceptions.BlendFileError("File is not a blend file", path)
|
raise exceptions.BlendFileError("File is not a blend file", path)
|
||||||
|
|
||||||
self.header = BlendFileHeader(self.fileobj, self.raw_filepath)
|
self.header = header.BlendFileHeader(self.fileobj, self.raw_filepath)
|
||||||
self.block_header_struct = self.header.create_block_header_struct()
|
self.block_header_struct = self.header.create_block_header_struct()
|
||||||
self.blocks = []
|
self.blocks = []
|
||||||
self.code_index = collections.defaultdict(list)
|
self.code_index = collections.defaultdict(list)
|
||||||
@ -171,9 +171,10 @@ class BlendFile:
|
|||||||
DNACatalog is a catalog of all information in the DNA1 file-block
|
DNACatalog is a catalog of all information in the DNA1 file-block
|
||||||
"""
|
"""
|
||||||
self.log.debug("building DNA catalog")
|
self.log.debug("building DNA catalog")
|
||||||
shortstruct = self.header.types.USHORT
|
endian = self.header.endian
|
||||||
shortstruct2 = self.header.types.USHORT2
|
shortstruct = endian.USHORT
|
||||||
intstruct = self.header.types.UINT
|
shortstruct2 = endian.USHORT2
|
||||||
|
intstruct = endian.UINT
|
||||||
assert intstruct.size == 4
|
assert intstruct.size == 4
|
||||||
|
|
||||||
data = self.fileobj.read(block.size)
|
data = self.fileobj.read(block.size)
|
||||||
@ -188,7 +189,7 @@ class BlendFile:
|
|||||||
|
|
||||||
self.log.debug("building #%d names" % names_len)
|
self.log.debug("building #%d names" % names_len)
|
||||||
for _ in range(names_len):
|
for _ in range(names_len):
|
||||||
typename = dna_io.read_data0_offset(data, offset)
|
typename = endian.read_data0_offset(data, offset)
|
||||||
offset = offset + len(typename) + 1
|
offset = offset + len(typename) + 1
|
||||||
typenames.append(dna.Name(typename))
|
typenames.append(dna.Name(typename))
|
||||||
|
|
||||||
@ -198,7 +199,7 @@ class BlendFile:
|
|||||||
offset += 4
|
offset += 4
|
||||||
self.log.debug("building #%d types" % types_len)
|
self.log.debug("building #%d types" % types_len)
|
||||||
for _ in range(types_len):
|
for _ in range(types_len):
|
||||||
dna_type_id = dna_io.read_data0_offset(data, offset)
|
dna_type_id = endian.read_data0_offset(data, offset)
|
||||||
types.append(dna.Struct(dna_type_id))
|
types.append(dna.Struct(dna_type_id))
|
||||||
offset += len(dna_type_id) + 1
|
offset += len(dna_type_id) + 1
|
||||||
|
|
||||||
@ -246,54 +247,6 @@ class BlendFile:
|
|||||||
return structs, sdna_index_from_id
|
return structs, sdna_index_from_id
|
||||||
|
|
||||||
|
|
||||||
class BlendFileHeader:
|
|
||||||
"""
|
|
||||||
BlendFileHeader represents the first 12 bytes of a blend file.
|
|
||||||
|
|
||||||
it contains information about the hardware architecture, which is relevant
|
|
||||||
to the structure of the rest of the file.
|
|
||||||
"""
|
|
||||||
log = log.getChild('BlendFileHeader')
|
|
||||||
structure = struct.Struct(b'7s1s1s3s')
|
|
||||||
|
|
||||||
def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path):
|
|
||||||
self.log.debug("reading blend-file-header %s", path)
|
|
||||||
header = fileobj.read(self.structure.size)
|
|
||||||
values = self.structure.unpack(header)
|
|
||||||
|
|
||||||
self.magic = values[0]
|
|
||||||
|
|
||||||
pointer_size_id = values[1]
|
|
||||||
if pointer_size_id == b'-':
|
|
||||||
self.pointer_size = 8
|
|
||||||
elif pointer_size_id == b'_':
|
|
||||||
self.pointer_size = 4
|
|
||||||
else:
|
|
||||||
raise exceptions.BlendFileError('invalid pointer size %r' % pointer_size_id, path)
|
|
||||||
|
|
||||||
endian_id = values[2]
|
|
||||||
if endian_id == b'v':
|
|
||||||
self.types = dna_io.LittleEndianTypes
|
|
||||||
self.endian_str = b'<' # indication for struct.Struct()
|
|
||||||
elif endian_id == b'V':
|
|
||||||
self.types = dna_io.BigEndianTypes
|
|
||||||
self.endian_str = b'>' # indication for struct.Struct()
|
|
||||||
else:
|
|
||||||
raise exceptions.BlendFileError('invalid endian indicator %r' % endian_id, path)
|
|
||||||
|
|
||||||
version_id = values[3]
|
|
||||||
self.version = int(version_id)
|
|
||||||
|
|
||||||
def create_block_header_struct(self) -> struct.Struct:
|
|
||||||
"""Create a Struct instance for parsing data block headers."""
|
|
||||||
return struct.Struct(b''.join((
|
|
||||||
self.endian_str,
|
|
||||||
b'4sI',
|
|
||||||
b'I' if self.pointer_size == 4 else b'Q',
|
|
||||||
b'II',
|
|
||||||
)))
|
|
||||||
|
|
||||||
|
|
||||||
class BlendFileBlock:
|
class BlendFileBlock:
|
||||||
"""
|
"""
|
||||||
Instance of a struct.
|
Instance of a struct.
|
||||||
@ -421,7 +374,7 @@ class BlendFileBlock:
|
|||||||
return dna_struct.field_get(
|
return dna_struct.field_get(
|
||||||
self.file.header, self.file.handle, path,
|
self.file.header, self.file.handle, path,
|
||||||
default=default,
|
default=default,
|
||||||
use_nil=use_nil, use_str=use_str,
|
nil_terminated=use_nil, as_str=use_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_recursive_iter(self, path, path_root=b"",
|
def get_recursive_iter(self, path, path_root=b"",
|
||||||
|
|||||||
@ -1,6 +1,12 @@
|
|||||||
import os
|
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from . import dna_io, header
|
||||||
|
|
||||||
|
# Either a simple path b'propname', or a tuple (b'parentprop', b'actualprop', arrayindex)
|
||||||
|
FieldPath = typing.Union[bytes, typing.Iterable[typing.Union[bytes, int]]]
|
||||||
|
|
||||||
|
|
||||||
class Name:
|
class Name:
|
||||||
"""dna.Name is a C-type name stored in the DNA as bytes."""
|
"""dna.Name is a C-type name stored in the DNA as bytes."""
|
||||||
@ -74,29 +80,52 @@ class Field:
|
|||||||
class Struct:
|
class Struct:
|
||||||
"""dna.Struct is a C-type structure stored in the DNA."""
|
"""dna.Struct is a C-type structure stored in the DNA."""
|
||||||
|
|
||||||
def __init__(self, dna_type_id: bytes):
|
def __init__(self, dna_type_id: bytes, size: int = None):
|
||||||
|
"""
|
||||||
|
:param dna_type_id: name of the struct in C, like b'AlembicObjectPath'.
|
||||||
|
:param size: only for unit tests; typically set after construction by
|
||||||
|
BlendFile.decode_structs(). If not set, it is calculated on the fly
|
||||||
|
when struct.size is evaluated, based on the available fields.
|
||||||
|
"""
|
||||||
self.dna_type_id = dna_type_id
|
self.dna_type_id = dna_type_id
|
||||||
|
self._size = size
|
||||||
self._fields = []
|
self._fields = []
|
||||||
self._fields_by_name = {}
|
self._fields_by_name = {}
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '%s(%r)' % (type(self).__qualname__, self.dna_type_id)
|
return '%s(%r)' % (type(self).__qualname__, self.dna_type_id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def size(self) -> int:
|
||||||
|
if self._size is None:
|
||||||
|
if not self._fields:
|
||||||
|
raise ValueError('Unable to determine size of fieldless %r' % self)
|
||||||
|
last_field = max(self._fields, key=lambda f: f.offset)
|
||||||
|
self._size = last_field.offset + last_field.size
|
||||||
|
return self._size
|
||||||
|
|
||||||
|
@size.setter
|
||||||
|
def size(self, new_size: int):
|
||||||
|
self._size = new_size
|
||||||
|
|
||||||
def append_field(self, field: Field):
|
def append_field(self, field: Field):
|
||||||
self._fields.append(field)
|
self._fields.append(field)
|
||||||
self._fields_by_name[field.name.name_only] = field
|
self._fields_by_name[field.name.name_only] = field
|
||||||
|
|
||||||
def field_from_path(self,
|
def field_from_path(self,
|
||||||
pointer_size: int,
|
pointer_size: int,
|
||||||
path: typing.Union[bytes, typing.Iterable[typing.Union[bytes, int]]]) \
|
path: FieldPath) \
|
||||||
-> typing.Tuple[typing.Optional[Field], int]:
|
-> typing.Tuple[Field, int]:
|
||||||
"""
|
"""
|
||||||
Support lookups as bytes or a tuple of bytes and optional index.
|
Support lookups as bytes or a tuple of bytes and optional index.
|
||||||
|
|
||||||
C style 'id.name' --> (b'id', b'name')
|
C style 'id.name' --> (b'id', b'name')
|
||||||
C style 'array[4]' --> (b'array', 4)
|
C style 'array[4]' --> (b'array', 4)
|
||||||
|
|
||||||
:returns: the field itself, and its offset taking into account the optional index.
|
:returns: the field itself, and its offset taking into account the
|
||||||
|
optional index. The offset is relative to the start of the struct,
|
||||||
|
i.e. relative to the BlendFileBlock containing the data.
|
||||||
|
:raises KeyError: if the field does not exist.
|
||||||
"""
|
"""
|
||||||
if isinstance(path, (tuple, list)):
|
if isinstance(path, (tuple, list)):
|
||||||
name = path[0]
|
name = path[0]
|
||||||
@ -124,72 +153,77 @@ class Struct:
|
|||||||
return field.dna_type.field_from_path(pointer_size, name_tail)
|
return field.dna_type.field_from_path(pointer_size, name_tail)
|
||||||
|
|
||||||
offset = field.offset
|
offset = field.offset
|
||||||
# fileobj.seek(field.offset, os.SEEK_CUR)
|
|
||||||
if index:
|
if index:
|
||||||
if field.name.is_pointer:
|
if field.name.is_pointer:
|
||||||
index_offset = pointer_size * index
|
index_offset = pointer_size * index
|
||||||
else:
|
else:
|
||||||
index_offset = field.dna_type.size * index
|
index_offset = field.dna_type.size * index
|
||||||
if index_offset >= field.size:
|
if index_offset >= field.size:
|
||||||
raise OverflowError('path %r is out of bounds of its DNA type' % path)
|
raise OverflowError('path %r is out of bounds of its DNA type %s' %
|
||||||
# fileobj.seek(index_offset, os.SEEK_CUR)
|
(path, field.dna_type))
|
||||||
offset += index_offset
|
offset += index_offset
|
||||||
|
|
||||||
return field, offset
|
return field, offset
|
||||||
|
|
||||||
def field_get(self, header, handle, path,
|
def field_get(self,
|
||||||
|
file_header: header.BlendFileHeader,
|
||||||
|
fileobj: typing.BinaryIO,
|
||||||
|
path: FieldPath,
|
||||||
default=...,
|
default=...,
|
||||||
use_nil=True, use_str=True,
|
nil_terminated=True,
|
||||||
|
as_str=True,
|
||||||
):
|
):
|
||||||
field = self.field_from_path(header, handle, path)
|
"""Read the value of the field from the blend file.
|
||||||
if field is None:
|
|
||||||
if default is not ...:
|
Assumes the file pointer of `fileobj` is seek()ed to the start of the
|
||||||
return default
|
struct on disk (e.g. the start of the BlendFileBlock containing the
|
||||||
else:
|
data).
|
||||||
raise KeyError("%r not found in %r (%r)" %
|
"""
|
||||||
(
|
try:
|
||||||
path, [f.dna_name.name_only for f in self._fields],
|
field, offset = self.field_from_path(file_header.pointer_size, path)
|
||||||
self.dna_type_id))
|
except KeyError:
|
||||||
|
if default is ...:
|
||||||
|
raise
|
||||||
|
return default
|
||||||
|
|
||||||
|
fileobj.seek(offset, os.SEEK_CUR)
|
||||||
|
|
||||||
dna_type = field.dna_type
|
dna_type = field.dna_type
|
||||||
dna_name = field.dna_name
|
dna_name = field.name
|
||||||
dna_size = field.dna_size
|
types = file_header.endian
|
||||||
|
|
||||||
|
# Some special cases (pointers, strings/bytes)
|
||||||
if dna_name.is_pointer:
|
if dna_name.is_pointer:
|
||||||
return DNA_IO.read_pointer(handle, header)
|
return types.read_pointer(fileobj, file_header.pointer_size)
|
||||||
elif dna_type.dna_type_id == b'int':
|
if dna_type.dna_type_id == b'char':
|
||||||
if dna_name.array_size > 1:
|
if field.size == 1:
|
||||||
return [DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)]
|
|
||||||
return DNA_IO.read_int(handle, header)
|
|
||||||
elif dna_type.dna_type_id == b'short':
|
|
||||||
if dna_name.array_size > 1:
|
|
||||||
return [DNA_IO.read_short(handle, header) for i in range(dna_name.array_size)]
|
|
||||||
return DNA_IO.read_short(handle, header)
|
|
||||||
elif dna_type.dna_type_id == b'uint64_t':
|
|
||||||
if dna_name.array_size > 1:
|
|
||||||
return [DNA_IO.read_ulong(handle, header) for i in range(dna_name.array_size)]
|
|
||||||
return DNA_IO.read_ulong(handle, header)
|
|
||||||
elif dna_type.dna_type_id == b'float':
|
|
||||||
if dna_name.array_size > 1:
|
|
||||||
return [DNA_IO.read_float(handle, header) for i in range(dna_name.array_size)]
|
|
||||||
return DNA_IO.read_float(handle, header)
|
|
||||||
elif dna_type.dna_type_id == b'char':
|
|
||||||
if dna_size == 1:
|
|
||||||
# Single char, assume it's bitflag or int value, and not a string/bytes data...
|
# Single char, assume it's bitflag or int value, and not a string/bytes data...
|
||||||
return DNA_IO.read_char(handle, header)
|
return types.read_char(fileobj)
|
||||||
if use_str:
|
if nil_terminated:
|
||||||
if use_nil:
|
data = types.read_bytes0(fileobj, dna_name.array_size)
|
||||||
return DNA_IO.read_string0(handle, dna_name.array_size)
|
|
||||||
else:
|
|
||||||
return DNA_IO.read_string(handle, dna_name.array_size)
|
|
||||||
else:
|
else:
|
||||||
if use_nil:
|
data = fileobj.read(dna_name.array_size)
|
||||||
return DNA_IO.read_bytes0(handle, dna_name.array_size)
|
|
||||||
else:
|
if as_str:
|
||||||
return DNA_IO.read_bytes(handle, dna_name.array_size)
|
return data.decode('utf8')
|
||||||
else:
|
return data
|
||||||
|
|
||||||
|
simple_readers = {
|
||||||
|
b'int': types.read_int,
|
||||||
|
b'short': types.read_short,
|
||||||
|
b'uint64_t': types.read_ulong,
|
||||||
|
b'float': types.read_float,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
simple_reader = simple_readers[dna_type.dna_type_id]
|
||||||
|
except KeyError:
|
||||||
raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" %
|
raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" %
|
||||||
(path, dna_name.name_only), dna_name, dna_type)
|
(path, dna_name.name_only), dna_name, dna_type)
|
||||||
|
|
||||||
|
if dna_name.array_size > 1:
|
||||||
|
return [simple_reader(fileobj) for _ in range(dna_name.array_size)]
|
||||||
|
return simple_reader(fileobj)
|
||||||
|
|
||||||
def field_set(self, header, handle, path, value):
|
def field_set(self, header, handle, path, value):
|
||||||
assert (type(path) == bytes)
|
assert (type(path) == bytes)
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import typing
|
|||||||
|
|
||||||
|
|
||||||
class EndianIO:
|
class EndianIO:
|
||||||
UCHAR = struct.Struct(b'<b')
|
UCHAR = struct.Struct(b'<B')
|
||||||
USHORT = struct.Struct(b'<H')
|
USHORT = struct.Struct(b'<H')
|
||||||
USHORT2 = struct.Struct(b'<HH') # two shorts in a row
|
USHORT2 = struct.Struct(b'<HH') # two shorts in a row
|
||||||
SSHORT = struct.Struct(b'<h')
|
SSHORT = struct.Struct(b'<h')
|
||||||
@ -48,17 +48,47 @@ class EndianIO:
|
|||||||
return cls._read(fileobj, cls.ULONG)
|
return cls._read(fileobj, cls.ULONG)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def read_pointer(cls, fileobj: typing.BinaryIO, fileheader):
|
def read_pointer(cls, fileobj: typing.BinaryIO, pointer_size: int):
|
||||||
"""Read a pointer from a file.
|
"""Read a pointer from a file."""
|
||||||
|
|
||||||
The pointer size is given by the header (BlendFileHeader).
|
if pointer_size == 4:
|
||||||
"""
|
|
||||||
|
|
||||||
if fileheader.pointer_size == 4:
|
|
||||||
return cls.read_uint(fileobj)
|
return cls.read_uint(fileobj)
|
||||||
if fileheader.pointer_size == 8:
|
if pointer_size == 8:
|
||||||
return cls.read_ulong(fileobj)
|
return cls.read_ulong(fileobj)
|
||||||
raise ValueError('unsupported pointer size %d' % fileheader.pointer_size)
|
raise ValueError('unsupported pointer size %d' % pointer_size)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def write_string(cls, fileobj: typing.BinaryIO, astring: str, fieldlen: int):
|
||||||
|
assert isinstance(astring, str)
|
||||||
|
# TODO: truncate the string on a UTF-8 character boundary to avoid creating invalid UTF-8.
|
||||||
|
cls.write_bytes(fileobj, astring.encode('utf-8'), fieldlen)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def write_bytes(cls, fileobj: typing.BinaryIO, data: bytes, fieldlen: int):
|
||||||
|
assert isinstance(data, (bytes, bytearray))
|
||||||
|
if len(data) >= fieldlen:
|
||||||
|
to_write = data[0:fieldlen]
|
||||||
|
else:
|
||||||
|
to_write = data + b'\0'
|
||||||
|
|
||||||
|
fileobj.write(to_write)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def read_bytes0(cls, fileobj, length):
|
||||||
|
data = fileobj.read(length)
|
||||||
|
return cls.read_data0(data)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def read_data0_offset(cls, data, offset):
|
||||||
|
add = data.find(b'\0', offset) - offset
|
||||||
|
return data[offset:offset + add]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def read_data0(cls, data):
|
||||||
|
add = data.find(b'\0')
|
||||||
|
if add < 0:
|
||||||
|
return data
|
||||||
|
return data[:add]
|
||||||
|
|
||||||
|
|
||||||
class LittleEndianTypes(EndianIO):
|
class LittleEndianTypes(EndianIO):
|
||||||
@ -66,7 +96,7 @@ class LittleEndianTypes(EndianIO):
|
|||||||
|
|
||||||
|
|
||||||
class BigEndianTypes(LittleEndianTypes):
|
class BigEndianTypes(LittleEndianTypes):
|
||||||
UCHAR = struct.Struct(b'>b')
|
UCHAR = struct.Struct(b'>B')
|
||||||
USHORT = struct.Struct(b'>H')
|
USHORT = struct.Struct(b'>H')
|
||||||
USHORT2 = struct.Struct(b'>HH') # two shorts in a row
|
USHORT2 = struct.Struct(b'>HH') # two shorts in a row
|
||||||
SSHORT = struct.Struct(b'>h')
|
SSHORT = struct.Struct(b'>h')
|
||||||
@ -74,41 +104,3 @@ class BigEndianTypes(LittleEndianTypes):
|
|||||||
SINT = struct.Struct(b'>i')
|
SINT = struct.Struct(b'>i')
|
||||||
FLOAT = struct.Struct(b'>f')
|
FLOAT = struct.Struct(b'>f')
|
||||||
ULONG = struct.Struct(b'>Q')
|
ULONG = struct.Struct(b'>Q')
|
||||||
|
|
||||||
|
|
||||||
def write_string(fileobj: typing.BinaryIO, astring: str, fieldlen: int):
|
|
||||||
assert isinstance(astring, str)
|
|
||||||
write_bytes(fileobj, astring.encode('utf-8'), fieldlen)
|
|
||||||
|
|
||||||
|
|
||||||
def write_bytes(fileobj: typing.BinaryIO, data: bytes, fieldlen: int):
|
|
||||||
assert isinstance(data, (bytes, bytearray))
|
|
||||||
if len(data) >= fieldlen:
|
|
||||||
to_write = data[0:fieldlen]
|
|
||||||
else:
|
|
||||||
to_write = data + b'\0'
|
|
||||||
|
|
||||||
fileobj.write(to_write)
|
|
||||||
|
|
||||||
|
|
||||||
def read_bytes0(fileobj, length):
|
|
||||||
data = fileobj.read(length)
|
|
||||||
return read_data0(data)
|
|
||||||
|
|
||||||
|
|
||||||
def read_string(fileobj, length):
|
|
||||||
return fileobj.read(length).decode('utf-8')
|
|
||||||
|
|
||||||
|
|
||||||
def read_string0(fileobj, length):
|
|
||||||
return read_bytes0(fileobj, length).decode('utf-8')
|
|
||||||
|
|
||||||
|
|
||||||
def read_data0_offset(data, offset):
|
|
||||||
add = data.find(b'\0', offset) - offset
|
|
||||||
return data[offset:offset + add]
|
|
||||||
|
|
||||||
|
|
||||||
def read_data0(data):
|
|
||||||
add = data.find(b'\0')
|
|
||||||
return data[:add]
|
|
||||||
|
|||||||
55
blender_asset_tracer/blendfile/header.py
Normal file
55
blender_asset_tracer/blendfile/header.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import logging
|
||||||
|
import pathlib
|
||||||
|
import struct
|
||||||
|
import typing
|
||||||
|
|
||||||
|
from . import dna_io, exceptions
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class BlendFileHeader:
|
||||||
|
"""
|
||||||
|
BlendFileHeader represents the first 12 bytes of a blend file.
|
||||||
|
|
||||||
|
it contains information about the hardware architecture, which is relevant
|
||||||
|
to the structure of the rest of the file.
|
||||||
|
"""
|
||||||
|
structure = struct.Struct(b'7s1s1s3s')
|
||||||
|
|
||||||
|
def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path):
|
||||||
|
log.debug("reading blend-file-header %s", path)
|
||||||
|
header = fileobj.read(self.structure.size)
|
||||||
|
values = self.structure.unpack(header)
|
||||||
|
|
||||||
|
self.magic = values[0]
|
||||||
|
|
||||||
|
pointer_size_id = values[1]
|
||||||
|
if pointer_size_id == b'-':
|
||||||
|
self.pointer_size = 8
|
||||||
|
elif pointer_size_id == b'_':
|
||||||
|
self.pointer_size = 4
|
||||||
|
else:
|
||||||
|
raise exceptions.BlendFileError('invalid pointer size %r' % pointer_size_id, path)
|
||||||
|
|
||||||
|
endian_id = values[2]
|
||||||
|
if endian_id == b'v':
|
||||||
|
self.endian = dna_io.LittleEndianTypes
|
||||||
|
self.endian_str = b'<' # indication for struct.Struct()
|
||||||
|
elif endian_id == b'V':
|
||||||
|
self.endian = dna_io.BigEndianTypes
|
||||||
|
self.endian_str = b'>' # indication for struct.Struct()
|
||||||
|
else:
|
||||||
|
raise exceptions.BlendFileError('invalid endian indicator %r' % endian_id, path)
|
||||||
|
|
||||||
|
version_id = values[3]
|
||||||
|
self.version = int(version_id)
|
||||||
|
|
||||||
|
def create_block_header_struct(self) -> struct.Struct:
|
||||||
|
"""Create a Struct instance for parsing data block headers."""
|
||||||
|
return struct.Struct(b''.join((
|
||||||
|
self.endian_str,
|
||||||
|
b'4sI',
|
||||||
|
b'I' if self.pointer_size == 4 else b'Q',
|
||||||
|
b'II',
|
||||||
|
)))
|
||||||
@ -1,6 +1,9 @@
|
|||||||
|
import io
|
||||||
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
from blender_asset_tracer.blendfile import dna
|
from blender_asset_tracer.blendfile import dna, dna_io
|
||||||
|
|
||||||
|
|
||||||
class NameTest(unittest.TestCase):
|
class NameTest(unittest.TestCase):
|
||||||
@ -68,18 +71,161 @@ class NameTest(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class StructTest(unittest.TestCase):
|
class StructTest(unittest.TestCase):
|
||||||
def test_field_from_path(self):
|
class FakeHeader:
|
||||||
|
pointer_size = 8
|
||||||
|
endian = dna_io.BigEndianTypes
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.s = dna.Struct(b'AlembicObjectPath')
|
||||||
|
self.s_char = dna.Struct(b'char', 1)
|
||||||
|
self.s_float = dna.Struct(b'float', 4)
|
||||||
|
self.s_uint64 = dna.Struct(b'uint64_t', 8)
|
||||||
|
self.s_uint128 = dna.Struct(b'uint128_t', 16) # non-supported type
|
||||||
|
|
||||||
|
self.f_next = dna.Field(self.s, dna.Name(b'*next'), 8, 0)
|
||||||
|
self.f_prev = dna.Field(self.s, dna.Name(b'*prev'), 8, 8)
|
||||||
|
self.f_path = dna.Field(self.s_char, dna.Name(b'path[4096]'), 4096, 16)
|
||||||
|
self.f_pointer = dna.Field(self.s_char, dna.Name(b'*ptr'), 3 * 8, 4112)
|
||||||
|
self.f_number = dna.Field(self.s_uint64, dna.Name(b'numbah'), 8, 4136)
|
||||||
|
self.f_floaty = dna.Field(self.s_float, dna.Name(b'floaty[2]'), 2 * 4, 4144)
|
||||||
|
self.f_flag = dna.Field(self.s_char, dna.Name(b'bitflag'), 1, 4152)
|
||||||
|
self.f_bignum = dna.Field(self.s_uint128, dna.Name(b'bignum'), 16, 4153)
|
||||||
|
|
||||||
|
self.s.append_field(self.f_next)
|
||||||
|
self.s.append_field(self.f_prev)
|
||||||
|
self.s.append_field(self.f_path)
|
||||||
|
self.s.append_field(self.f_pointer)
|
||||||
|
self.s.append_field(self.f_number)
|
||||||
|
self.s.append_field(self.f_floaty)
|
||||||
|
self.s.append_field(self.f_flag)
|
||||||
|
self.s.append_field(self.f_bignum)
|
||||||
|
|
||||||
|
def test_autosize(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Maybe it would be better to just return 0 on empty structs.
|
||||||
|
# They are actually used in Blendfiles (for example
|
||||||
|
# AbcArchiveHandle), but when actually loading from a blendfile
|
||||||
|
# the size property is explicitly set anyway. The situation we
|
||||||
|
# test here is for manually created Struct instances that don't
|
||||||
|
# have any fields.
|
||||||
|
dna.Struct(b'EmptyStruct').size
|
||||||
|
|
||||||
|
# Create AlebicObjectPath as it is actually used in Blender 2.79a
|
||||||
s = dna.Struct(b'AlembicObjectPath')
|
s = dna.Struct(b'AlembicObjectPath')
|
||||||
f_next = dna.Field(s, dna.Name(b'*next'), 8, 0)
|
f_next = dna.Field(s, dna.Name(b'*next'), 8, 0)
|
||||||
f_prev = dna.Field(s, dna.Name(b'*prev'), 8, 8)
|
f_prev = dna.Field(s, dna.Name(b'*prev'), 8, 8)
|
||||||
f_path = dna.Field(dna.Struct(b'char'), dna.Name(b'path[4096]'), 4096, 16)
|
f_path = dna.Field(self.s_char, dna.Name(b'path[4096]'), 4096, 16)
|
||||||
f_pointer = dna.Field(dna.Struct(b'char'), dna.Name(b'*ptr'), 3 * 8, 16 + 4096)
|
|
||||||
s.append_field(f_next)
|
s.append_field(f_next)
|
||||||
s.append_field(f_prev)
|
s.append_field(f_prev)
|
||||||
s.append_field(f_path)
|
s.append_field(f_path)
|
||||||
s.append_field(f_pointer)
|
|
||||||
|
|
||||||
|
self.assertEqual(s.size, 4112)
|
||||||
|
|
||||||
|
def test_field_from_path(self):
|
||||||
psize = 8
|
psize = 8
|
||||||
self.assertEqual(s.field_from_path(psize, b'path'), (f_path, 16))
|
self.assertEqual(self.s.field_from_path(psize, b'path'),
|
||||||
self.assertEqual(s.field_from_path(psize, (b'prev', b'path')), (f_path, 16))
|
(self.f_path, 16))
|
||||||
self.assertEqual(s.field_from_path(psize, (b'ptr', 2)), (f_pointer, 16 + 4096 + 2 * psize))
|
self.assertEqual(self.s.field_from_path(psize, (b'prev', b'path')),
|
||||||
|
(self.f_path, 16))
|
||||||
|
self.assertEqual(self.s.field_from_path(psize, (b'ptr', 2)),
|
||||||
|
(self.f_pointer, 16 + 4096 + 2 * psize))
|
||||||
|
self.assertEqual(self.s.field_from_path(psize, (b'floaty', 1)),
|
||||||
|
(self.f_floaty, 4144 + self.s_float.size))
|
||||||
|
|
||||||
|
with self.assertRaises(OverflowError):
|
||||||
|
self.s.field_from_path(psize, (b'floaty', 2))
|
||||||
|
|
||||||
|
with self.assertRaises(KeyError):
|
||||||
|
self.s.field_from_path(psize, b'non-existant')
|
||||||
|
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
self.s.field_from_path(psize, 'path')
|
||||||
|
|
||||||
|
def test_simple_field_get(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa'
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'numbah')
|
||||||
|
|
||||||
|
self.assertEqual(val, 0x1020304fffefdfa)
|
||||||
|
fileobj.seek.assert_called_with(4136, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_field_get_default(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.side_effect = RuntimeError
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'nonexistant', default=519871531)
|
||||||
|
|
||||||
|
self.assertEqual(val, 519871531)
|
||||||
|
fileobj.seek.assert_not_called()
|
||||||
|
|
||||||
|
def test_field_get_nonexistant(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.side_effect = RuntimeError
|
||||||
|
|
||||||
|
with self.assertRaises(KeyError):
|
||||||
|
self.s.field_get(self.FakeHeader(), fileobj, b'nonexistant')
|
||||||
|
fileobj.seek.assert_not_called()
|
||||||
|
|
||||||
|
def test_field_get_unsupported_type(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.side_effect = RuntimeError
|
||||||
|
|
||||||
|
with self.assertRaises(NotImplementedError):
|
||||||
|
self.s.field_get(self.FakeHeader(), fileobj, b'bignum')
|
||||||
|
fileobj.seek.assert_called_with(4153, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_pointer_field_get(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\xf0\x9f\xa6\x87\x00dum'
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'ptr')
|
||||||
|
|
||||||
|
self.assertEqual(0xf09fa6870064756d, val)
|
||||||
|
fileobj.seek.assert_called_with(4112, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_string_field_get(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\xf0\x9f\xa6\x87\x00dummydata'
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=True)
|
||||||
|
|
||||||
|
self.assertEqual('🦇', val)
|
||||||
|
fileobj.seek.assert_called_with(16, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_string_field_get_single_char(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\xf0'
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'bitflag')
|
||||||
|
|
||||||
|
self.assertEqual(0xf0, val)
|
||||||
|
fileobj.seek.assert_called_with(4152, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_string_field_get_invalid_utf8(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa'
|
||||||
|
|
||||||
|
with self.assertRaises(UnicodeDecodeError):
|
||||||
|
self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=True)
|
||||||
|
|
||||||
|
def test_string_field_get_bytes_null_terminated(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata'
|
||||||
|
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=False)
|
||||||
|
self.assertEqual(b'\x01\x02\x03\x04\xff\xfe\xfd\xfa', val)
|
||||||
|
fileobj.seek.assert_called_with(16, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_string_field_get_bytes(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.return_value = b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata'
|
||||||
|
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'path', as_str=False,
|
||||||
|
nil_terminated=False)
|
||||||
|
self.assertEqual(b'\x01\x02\x03\x04\xff\xfe\xfd\xfa\x00dummydata', val)
|
||||||
|
fileobj.seek.assert_called_with(16, os.SEEK_CUR)
|
||||||
|
|
||||||
|
def test_string_field_get_float_array(self):
|
||||||
|
fileobj = mock.MagicMock(io.BufferedReader)
|
||||||
|
fileobj.read.side_effect = (b'@333', b'@2\x8f\\')
|
||||||
|
|
||||||
|
val = self.s.field_get(self.FakeHeader(), fileobj, b'floaty')
|
||||||
|
self.assertAlmostEqual(2.8, val[0])
|
||||||
|
self.assertAlmostEqual(2.79, val[1])
|
||||||
|
fileobj.seek.assert_called_with(4144, os.SEEK_CUR)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user