# ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** # # (c) 2009, At Mind B.V. - Jeroen Bakker # (c) 2014, Blender Foundation - Campbell Barton # (c) 2018, Blender Foundation - Sybren A. Stüvel import atexit import collections import functools import gzip import logging import os import struct import pathlib import shutil import tempfile import typing from . import exceptions, dna_io, dna, header from blender_asset_tracer import bpathlib log = logging.getLogger(__name__) FILE_BUFFER_SIZE = 1024 * 1024 BLENDFILE_MAGIC = b'BLENDER' GZIP_MAGIC = b'\x1f\x8b' BFBList = typing.List['BlendFileBlock'] _cached_bfiles = {} # type: typing.Dict[pathlib.Path, BlendFile] def open_cached(path: pathlib.Path, mode='rb', assert_cached: typing.Optional[bool] = None) -> 'BlendFile': """Open a blend file, ensuring it is only opened once.""" my_log = log.getChild('open_cached') bfile_path = bpathlib.make_absolute(path) if assert_cached is not None: is_cached = bfile_path in _cached_bfiles if assert_cached and not is_cached: raise AssertionError('File %s was not cached' % bfile_path) elif not assert_cached and is_cached: raise AssertionError('File %s was cached' % bfile_path) try: bfile = _cached_bfiles[bfile_path] except KeyError: my_log.debug('Opening non-cached %s', path) bfile = BlendFile(path, mode=mode) _cached_bfiles[bfile_path] = bfile else: my_log.debug('Returning cached %s', path) return bfile @atexit.register def close_all_cached() -> None: if not _cached_bfiles: # Don't even log anything when there is nothing to close return log.debug('Closing %d cached blend files', len(_cached_bfiles)) for bfile in list(_cached_bfiles.values()): bfile.close() _cached_bfiles.clear() def _cache(path: pathlib.Path, bfile: 'BlendFile'): """Add a BlendFile to the cache.""" bfile_path = bpathlib.make_absolute(path) _cached_bfiles[bfile_path] = bfile def _uncache(path: pathlib.Path): """Remove a BlendFile object from the cache.""" bfile_path = bpathlib.make_absolute(path) _cached_bfiles.pop(bfile_path, None) class BlendFile: """Representation of a blend file. :ivar filepath: which file this object represents. :ivar raw_filepath: which file is accessed; same as filepath for uncompressed files, but a temporary file for compressed files. :ivar fileobj: the file object that's being accessed. """ log = log.getChild('BlendFile') def __init__(self, path: pathlib.Path, mode='rb') -> None: """Create a BlendFile instance for the blend file at the path. Opens the file for reading or writing pending on the access. Compressed blend files are uncompressed to a temporary location before opening. :param path: the file to open :param mode: see mode description of pathlib.Path.open() """ self.filepath = path self.raw_filepath = path self._is_modified = False self.fileobj = self._open_file(path, mode) self.blocks = [] # type: BFBList """BlendFileBlocks of this file, in disk order.""" self.code_index = collections.defaultdict(list) # type: typing.Dict[bytes, BFBList] self.structs = [] # type: typing.List[dna.Struct] self.sdna_index_from_id = {} # type: typing.Dict[bytes, int] self.block_from_addr = {} # type: typing.Dict[int, BlendFileBlock] self.header = header.BlendFileHeader(self.fileobj, self.raw_filepath) self.block_header_struct = self.header.create_block_header_struct() self._load_blocks() def _open_file(self, path: pathlib.Path, mode: str) -> typing.IO[bytes]: """Open a blend file, decompressing if necessary. This does not parse the blend file yet, just makes sure that self.fileobj is opened and that self.filepath and self.raw_filepath are set. :raises exceptions.BlendFileError: when the blend file doesn't have the correct magic bytes. """ if 'b' not in mode: raise ValueError('Only binary modes are supported, not %r' % mode) self.filepath = path fileobj = path.open(mode, buffering=FILE_BUFFER_SIZE) # typing.IO[bytes] fileobj.seek(0, os.SEEK_SET) magic = fileobj.read(len(BLENDFILE_MAGIC)) if magic == BLENDFILE_MAGIC: self.is_compressed = False self.raw_filepath = path return fileobj if magic[:2] == GZIP_MAGIC: self.is_compressed = True log.debug("compressed blendfile detected: %s", path) # Decompress to a temporary file. tmpfile = tempfile.NamedTemporaryFile() fileobj.seek(0, os.SEEK_SET) with gzip.GzipFile(fileobj=fileobj, mode=mode) as gzfile: magic = gzfile.read(len(BLENDFILE_MAGIC)) if magic != BLENDFILE_MAGIC: raise exceptions.BlendFileError("Compressed file is not a blend file", path) data = magic while data: tmpfile.write(data) data = gzfile.read(FILE_BUFFER_SIZE) # Further interaction should be done with the uncompressed file. self.raw_filepath = pathlib.Path(tmpfile.name) fileobj.close() return tmpfile fileobj.close() raise exceptions.BlendFileError("File is not a blend file", path) def _load_blocks(self) -> None: """Read the blend file to load its DNA structure to memory.""" self.structs.clear() self.sdna_index_from_id.clear() while True: block = BlendFileBlock(self) if block.code == b'ENDB': break if block.code == b'DNA1': self.decode_structs(block) else: self.fileobj.seek(block.size, os.SEEK_CUR) self.blocks.append(block) self.code_index[block.code].append(block) self.block_from_addr[block.addr_old] = block if not self.structs: raise exceptions.NoDNA1Block("No DNA1 block in file, not a valid .blend file", self.filepath) def __repr__(self) -> str: clsname = self.__class__.__qualname__ if self.filepath == self.raw_filepath: return '<%s %r>' % (clsname, self.filepath) return '<%s %r reading from %r>' % (clsname, self.filepath, self.raw_filepath) def __enter__(self) -> 'BlendFile': return self def __exit__(self, exctype, excvalue, traceback) -> None: self.close() def copy_and_rebind(self, path: pathlib.Path, mode='rb') -> None: """Change which file is bound to this BlendFile. This allows cloning a previously opened file, and rebinding it to reuse the already-loaded DNA structs and data blocks. """ log.debug('Rebinding %r to %s', self, path) self.close() _uncache(self.filepath) self.log.debug('Copying %s to %s', self.filepath, path) # TODO(Sybren): remove str() calls when targeting Python 3.6+ shutil.copy(str(self.filepath), str(path)) self.fileobj = self._open_file(path, mode=mode) _cache(path, self) @property def is_modified(self) -> bool: return self._is_modified def mark_modified(self) -> None: """Recompess the file when it is closed.""" self.log.debug('Marking %s as modified', self.raw_filepath) self._is_modified = True def find_blocks_from_code(self, code: bytes) -> typing.List['BlendFileBlock']: assert isinstance(code, bytes) return self.code_index[code] def close(self) -> None: """Close the blend file. Recompresses the blend file if it was compressed and changed. """ if not self.fileobj: return if self._is_modified: log.debug('closing blend file %s after it was modified', self.raw_filepath) if self._is_modified and self.is_compressed: log.debug("recompressing modified blend file %s", self.raw_filepath) self.fileobj.seek(os.SEEK_SET, 0) with gzip.open(str(self.filepath), 'wb') as gzfile: while True: data = self.fileobj.read(FILE_BUFFER_SIZE) if not data: break gzfile.write(data) log.debug("compressing to %s finished", self.filepath) # Close the file object after recompressing, as it may be a temporary # file that'll disappear as soon as we close it. self.fileobj.close() self._is_modified = False try: del _cached_bfiles[self.filepath] except KeyError: pass def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next) -> None: # never refine to a smaller type curr_struct = self.structs[sdna_index_curr] next_struct = self.structs[sdna_index_next] if curr_struct.size > next_struct.size: raise RuntimeError("Can't refine to smaller type (%s -> %s)" % (curr_struct.dna_type_id.decode('utf-8'), next_struct.dna_type_id.decode('utf-8'))) def decode_structs(self, block: 'BlendFileBlock'): """ DNACatalog is a catalog of all information in the DNA1 file-block """ self.log.debug("building DNA catalog") # Get some names in the local scope for faster access. structs = self.structs sdna_index_from_id = self.sdna_index_from_id endian = self.header.endian shortstruct = endian.USHORT shortstruct2 = endian.USHORT2 intstruct = endian.UINT assert intstruct.size == 4 def pad_up_4(off: int) -> int: return (off + 3) & ~3 data = self.fileobj.read(block.size) types = [] typenames = [] offset = 8 names_len = intstruct.unpack_from(data, offset)[0] offset += 4 self.log.debug("building #%d names" % names_len) for _ in range(names_len): typename = endian.read_data0_offset(data, offset) offset = offset + len(typename) + 1 typenames.append(dna.Name(typename)) offset = pad_up_4(offset) offset += 4 types_len = intstruct.unpack_from(data, offset)[0] offset += 4 self.log.debug("building #%d types" % types_len) for _ in range(types_len): dna_type_id = endian.read_data0_offset(data, offset) types.append(dna.Struct(dna_type_id)) offset += len(dna_type_id) + 1 offset = pad_up_4(offset) offset += 4 self.log.debug("building #%d type-lengths" % types_len) for i in range(types_len): typelen = shortstruct.unpack_from(data, offset)[0] offset = offset + 2 types[i].size = typelen offset = pad_up_4(offset) offset += 4 structs_len = intstruct.unpack_from(data, offset)[0] offset += 4 log.debug("building #%d structures" % structs_len) pointer_size = self.header.pointer_size for sdna_index in range(structs_len): struct_type_index, fields_len = shortstruct2.unpack_from(data, offset) offset += 4 dna_struct = types[struct_type_index] sdna_index_from_id[dna_struct.dna_type_id] = sdna_index structs.append(dna_struct) dna_offset = 0 for field_index in range(fields_len): field_type_index, field_name_index = shortstruct2.unpack_from(data, offset) offset += 4 dna_type = types[field_type_index] dna_name = typenames[field_name_index] if dna_name.is_pointer or dna_name.is_method_pointer: dna_size = pointer_size * dna_name.array_size else: dna_size = dna_type.size * dna_name.array_size field = dna.Field(dna_type, dna_name, dna_size, dna_offset) dna_struct.append_field(field) dna_offset += dna_size def abspath(self, relpath: bpathlib.BlendPath) -> bpathlib.BlendPath: """Construct an absolute path from a blendfile-relative path.""" if relpath.is_absolute(): return relpath bfile_dir = self.filepath.absolute().parent root = bpathlib.BlendPath(bfile_dir) abspath = relpath.absolute(root) my_log = self.log.getChild('abspath') my_log.debug('Resolved %s relative to %s to %s', relpath, self.filepath, abspath) return abspath def dereference_pointer(self, address: int) -> 'BlendFileBlock': """Return the pointed-to block, or raise SegmentationFault.""" try: return self.block_from_addr[address] except KeyError: raise exceptions.SegmentationFault('address does not exist', address) from None def struct(self, name: bytes) -> dna.Struct: index = self.sdna_index_from_id[name] return self.structs[index] @functools.total_ordering class BlendFileBlock: """ Instance of a struct. """ # Due to the huge number of BlendFileBlock objects created for packing a # production-size blend file, using slots here actually makes the # dependency tracer significantly (p<0.001) faster. In my test case the # speed improvement was 16% for a 'bam list' command. __slots__ = ( 'bfile', 'code', 'size', 'addr_old', 'sdna_index', 'count', 'file_offset', 'endian', '_id_name', ) log = log.getChild('BlendFileBlock') old_structure = struct.Struct(b'4sI') """old blend files ENDB block structure""" def __init__(self, bfile: BlendFile) -> None: self.bfile = bfile # Defaults; actual values are set by interpreting the block header. self.code = b'' self.size = 0 self.addr_old = 0 self.sdna_index = 0 self.count = 0 self.file_offset = 0 """Offset in bytes from start of file to beginning of the data block. Points to the data after the block header. """ self.endian = bfile.header.endian self._id_name = ... # type: typing.Union[None, ellipsis, bytes] header_struct = bfile.block_header_struct data = bfile.fileobj.read(header_struct.size) if len(data) != header_struct.size: self.log.warning("Blend file %s seems to be truncated, " "expected %d bytes but could read only %d", bfile.filepath, header_struct.size, len(data)) self.code = b'ENDB' return # header size can be 8, 20, or 24 bytes long # 8: old blend files ENDB block (exception) # 20: normal headers 32 bit platform # 24: normal headers 64 bit platform if len(data) <= 15: self.log.debug('interpreting block as old-style ENB block') blockheader = self.old_structure.unpack(data) self.code = self.endian.read_data0(blockheader[0]) return blockheader = header_struct.unpack(data) self.code = self.endian.read_data0(blockheader[0]) if self.code != b'ENDB': self.size = blockheader[1] self.addr_old = blockheader[2] self.sdna_index = blockheader[3] self.count = blockheader[4] self.file_offset = bfile.fileobj.tell() def __repr__(self) -> str: return "<%s.%s (%s), size=%d at %s>" % ( self.__class__.__name__, self.dna_type_name, self.code.decode(), self.size, hex(self.addr_old), ) def __hash__(self) -> int: return hash((self.code, self.addr_old, self.bfile.filepath)) def __eq__(self, other: object) -> bool: if not isinstance(other, BlendFileBlock): return False return (self.code == other.code and self.addr_old == other.addr_old and self.bfile.filepath == other.bfile.filepath) def __lt__(self, other: 'BlendFileBlock') -> bool: """Order blocks by file path and offset within that file.""" if not isinstance(other, BlendFileBlock): raise NotImplemented() my_key = self.bfile.filepath, self.file_offset other_key = other.bfile.filepath, other.file_offset return my_key < other_key def __bool__(self) -> bool: """Data blocks are always True.""" return True @property def dna_type(self) -> dna.Struct: return self.bfile.structs[self.sdna_index] @property def dna_type_id(self) -> bytes: return self.dna_type.dna_type_id @property def dna_type_name(self) -> str: return self.dna_type_id.decode('ascii') @property def id_name(self) -> typing.Optional[bytes]: """Same as block[b'id', b'name']; None if there is no such field. Evaluated only once, so safe to call multiple times without producing excessive disk I/O. """ if self._id_name is ...: try: self._id_name = self[b'id', b'name'] except KeyError: self._id_name = None # TODO(Sybren): figure out how to let mypy know self._id_name cannot # be ellipsis at this point. return self._id_name # type: ignore def refine_type_from_index(self, sdna_index: int): """Change the DNA Struct associated with this block. Use to make a block type more specific, for example when you have a modifier but need to access it as SubSurfModifier. :param sdna_index: the SDNA index of the DNA type. """ assert type(sdna_index) is int sdna_index_curr = self.sdna_index self.bfile.ensure_subtype_smaller(sdna_index_curr, sdna_index) self.sdna_index = sdna_index def refine_type(self, dna_type_id: bytes): """Change the DNA Struct associated with this block. Use to make a block type more specific, for example when you have a modifier but need to access it as SubSurfModifier. :param dna_type_id: the name of the DNA type. """ assert isinstance(dna_type_id, bytes) sdna_index = self.bfile.sdna_index_from_id[dna_type_id] self.refine_type_from_index(sdna_index) def abs_offset(self, path: dna.FieldPath) -> typing.Tuple[int, int]: """Compute the absolute file offset of the field. :returns: tuple (offset in bytes, length of array in items) """ field, field_offset = self.dna_type.field_from_path(self.bfile.header.pointer_size, path) return self.file_offset + field_offset, field.name.array_size def get(self, path: dna.FieldPath, default=..., null_terminated=True, as_str=False, return_field=False ) -> typing.Any: """Read a property and return the value. :param path: name of the property (like `b'loc'`), tuple of names to read a sub-property (like `(b'id', b'name')`), or tuple of name and index to read one item from an array (like `(b'loc', 2)`) :param default: The value to return when the field does not exist. Use Ellipsis (the default value) to raise a KeyError instead. :param null_terminated: Only used when reading bytes or strings. When True, stops reading at the first zero byte; be careful with this when reading binary data. :param as_str: When True, automatically decode bytes to string (assumes UTF-8 encoding). :param return_field: When True, returns tuple (dna.Field, value). Otherwise just returns the value. """ self.bfile.fileobj.seek(self.file_offset, os.SEEK_SET) dna_struct = self.bfile.structs[self.sdna_index] field, value = dna_struct.field_get( self.bfile.header, self.bfile.fileobj, path, default=default, null_terminated=null_terminated, as_str=as_str, ) if return_field: return value, field return value def get_recursive_iter(self, path: dna.FieldPath, path_root: dna.FieldPath = b'', default=..., null_terminated=True, as_str=True, ) -> typing.Iterator[typing.Tuple[dna.FieldPath, typing.Any]]: """Generator, yields (path, property value) tuples. If a property cannot be decoded, a string representing its DNA type name is used as its value instead, between pointy brackets. """ path_full = path # type: dna.FieldPath if path_root: if isinstance(path_root, bytes): path_root = (path_root,) if isinstance(path, bytes): path = (path,) path_full = tuple(path_root) + tuple(path) try: # Try accessing as simple property yield (path_full, self.get(path_full, default, null_terminated, as_str)) except exceptions.NoReaderImplemented as ex: # This was not a simple property, so recurse into its DNA Struct. dna_type = ex.dna_type struct_index = self.bfile.sdna_index_from_id.get(dna_type.dna_type_id) if struct_index is None: yield (path_full, "<%s>" % dna_type.dna_type_id.decode('ascii')) return # Recurse through the fields. for f in dna_type.fields: yield from self.get_recursive_iter(f.name.name_only, path_full, default=default, null_terminated=null_terminated, as_str=as_str) def hash(self) -> int: """Generate a pointer-independent hash for the block. Generates a 'hash' that can be used instead of addr_old as block id, which should be 'stable' across .blend file load & save (i.e. it does not changes due to pointer addresses variations). """ # TODO This implementation is most likely far from optimal... and CRC32 # is not kown as the best hashing algo either. But for now does the job! import zlib dna_type = self.dna_type pointer_size = self.bfile.header.pointer_size hsh = 1 for path, value in self.items_recursive(): field, _ = dna_type.field_from_path(pointer_size, path) if field.name.is_pointer: continue hsh = zlib.adler32(str(value).encode(), hsh) return hsh def set(self, path: bytes, value): dna_struct = self.bfile.structs[self.sdna_index] self.bfile.mark_modified() self.bfile.fileobj.seek(self.file_offset, os.SEEK_SET) return dna_struct.field_set(self.bfile.header, self.bfile.fileobj, path, value) def get_pointer( self, path: dna.FieldPath, default=..., ) -> typing.Union[None, 'BlendFileBlock']: """Same as get() but dereferences a pointer. :raises exceptions.SegmentationFault: when there is no datablock with the pointed-to address. """ result = self.get(path, default=default) # If it's not an integer, we have no pointer to follow and this may # actually be a non-pointer property. if type(result) is not int: return result if result == 0: return None try: return self.bfile.dereference_pointer(result) except exceptions.SegmentationFault as ex: ex.field_path = path raise def iter_array_of_pointers(self, path: dna.FieldPath, array_size: int) \ -> typing.Iterator['BlendFileBlock']: """Dereference pointers from an array-of-pointers field. Use this function when you have a field like Mesh materials: `Mat **mat` :param path: The array-of-pointers field. :param array_size: Number of items in the array. If None, the on-disk size of the DNA field is divided by the pointer size to obtain the array size. """ if array_size == 0: return array = self.get_pointer(path) assert array is not None assert array.code == b'DATA', \ 'Array data block should have code DATA, is %r' % array.code.decode() file_offset = array.file_offset endian = self.bfile.header.endian ps = self.bfile.header.pointer_size for i in range(array_size): fileobj = self.bfile.fileobj fileobj.seek(file_offset + ps * i, os.SEEK_SET) address = endian.read_pointer(fileobj, ps) if address == 0: continue yield self.bfile.dereference_pointer(address) def iter_fixed_array_of_pointers(self, path: dna.FieldPath) \ -> typing.Iterator['BlendFileBlock']: """Yield blocks from a fixed-size array field. Use this function when you have a field like lamp textures: `MTex *mtex[18]` The size of the array is determined automatically by the size in bytes of the field divided by the pointer size of the blend file. :param path: The array field. :raises KeyError: if the path does not exist. """ dna_struct = self.dna_type ps = self.bfile.header.pointer_size endian = self.bfile.header.endian fileobj = self.bfile.fileobj field, offset_in_struct = dna_struct.field_from_path(ps, path) array_size = field.size // ps for i in range(array_size): fileobj.seek(self.file_offset + offset_in_struct + ps * i, os.SEEK_SET) address = endian.read_pointer(fileobj, ps) if not address: # Fixed-size arrays contain 0-pointers. continue yield self.bfile.dereference_pointer(address) def __getitem__(self, path: dna.FieldPath): return self.get(path) def __setitem__(self, item: bytes, value) -> None: self.set(item, value) def keys(self) -> typing.Iterator[bytes]: """Generator, yields all field names of this block.""" return (f.name.name_only for f in self.dna_type.fields) def values(self) -> typing.Iterable[typing.Any]: for k in self.keys(): try: yield self[k] except exceptions.NoReaderImplemented as ex: yield '<%s>' % ex.dna_type.dna_type_id.decode('ascii') def items(self) -> typing.Iterable[typing.Tuple[bytes, typing.Any]]: for k in self.keys(): try: yield (k, self[k]) except exceptions.NoReaderImplemented as ex: yield (k, '<%s>' % ex.dna_type.dna_type_id.decode('ascii')) def items_recursive(self) -> typing.Iterator[typing.Tuple[dna.FieldPath, typing.Any]]: """Generator, yields (property path, property value) recursively for all properties.""" for k in self.keys(): yield from self.get_recursive_iter(k, as_str=False)