548 lines
19 KiB
Python

# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2009, At Mind B.V. - Jeroen Bakker
# (c) 2014, Blender Foundation - Campbell Barton
# (c) 2018, Blender Foundation - Sybren A. Stüvel
import collections
import gzip
import logging
import os
import struct
import pathlib
import tempfile
import typing
from . import exceptions, dna_io
log = logging.getLogger(__name__)
FILE_BUFFER_SIZE = 1024 * 1024
BLENDFILE_MAGIC = b'BLENDER'
GZIP_MAGIC = b'\x1f\x8b'
def pad_up_4(offset):
return (offset + 3) & ~3
class BlendFile:
"""Representation of a blend file.
:ivar filepath: which file this object represents.
:ivar raw_filepath: which file is accessed; same as filepath for
uncompressed files, but a temporary file for compressed files.
:ivar fileobj: the file object that's being accessed.
"""
log = log.getChild('BlendFile')
def __init__(self, path: pathlib.Path, mode='rb'):
"""Create a BlendFile instance for the blend file at the path.
Opens the file for reading or writing pending on the access. Compressed
blend files are uncompressed to a temporary location before opening.
:param path: the file to open
:param mode: see mode description of pathlib.Path.open()
"""
self.filepath = path
fileobj = path.open(mode)
magic = fileobj.read(len(BLENDFILE_MAGIC))
if magic == BLENDFILE_MAGIC:
self.is_compressed = False
self.raw_filepath = path
self.fileobj = fileobj
elif magic == GZIP_MAGIC:
self.is_compressed = True
log.debug("compressed blendfile detected: %s", path)
# Decompress to a temporary file.
tmpfile = tempfile.NamedTemporaryFile()
with gzip.GzipFile(fileobj=fileobj, mode=mode) as gzfile:
magic = gzfile.read(len(BLENDFILE_MAGIC))
if magic != BLENDFILE_MAGIC:
raise exceptions.BlendFileError("Compressed file is not a blend file", path)
data = magic
while data:
tmpfile.write(data)
data = gzfile.read(FILE_BUFFER_SIZE)
# Further interaction should be done with the uncompressed file.
self.raw_filepath = pathlib.Path(tmpfile.name)
fileobj.close()
self.fileobj = tmpfile
elif magic != BLENDFILE_MAGIC:
raise exceptions.BlendFileError("File is not a blend file", path)
self.header = BlendFileHeader(self.fileobj, self.raw_filepath)
self.block_header_struct = self.header.create_block_header_struct()
self.blocks = []
self.code_index = collections.defaultdict(list)
self.structs = []
self.sdna_index_from_id = {}
self.block_from_offset = {}
def load_dna1_block(self):
"""Read the blend file to load its DNA structure to memory."""
fileobj = self.fileobj
while True:
block = BlendFileBlock(fileobj, self)
if block.code == b'ENDB':
break
if block.code == b'DNA1':
self.structs, self.sdna_index_from_id = self.decode_structs(block)
else:
fileobj.seek(block.size, os.SEEK_CUR)
self.blocks.append(block)
self.code_index[block.code].append(block)
if not self.structs:
raise exceptions.NoDNA1Block("No DNA1 block in file, not a valid .blend file",
self.filepath)
# cache (could lazy init, incase we never use?)
self.block_from_offset = {block.addr_old: block for block in self.blocks
if block.code != b'ENDB'}
def __repr__(self):
clsname = self.__class__.__qualname__
if self.filepath == self.raw_filepath:
return '<%s %r>' % (clsname, self.filepath)
return '<%s %r reading from %r>' % (clsname, self.filepath, self.raw_filepath)
def __enter__(self):
return self
def __exit__(self, exctype, excvalue, traceback):
self.close()
def find_blocks_from_code(self, code):
assert (type(code) == bytes)
if code not in self.code_index:
return []
return self.code_index[code]
def find_block_from_offset(self, offset):
# same as looking looping over all blocks,
# then checking ``block.addr_old == offset``
assert (type(offset) is int)
return self.block_from_offset.get(offset)
def close(self):
"""Close the blend file.
Writes the blend file to disk if it was changed.
"""
if self.fileobj:
self.fileobj.close()
def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next):
# never refine to a smaller type
if (self.structs[sdna_index_curr].size >
self.structs[sdna_index_next].size):
raise RuntimeError("cant refine to smaller type (%s -> %s)" %
(self.structs[sdna_index_curr].dna_type_id.decode('ascii'),
self.structs[sdna_index_next].dna_type_id.decode('ascii')))
def decode_structs(self, block: 'BlendFileBlock'):
"""
DNACatalog is a catalog of all information in the DNA1 file-block
"""
self.log.debug("building DNA catalog")
shortstruct = self.header.types.USHORT
shortstruct2 = self.header.types.USHORT2
intstruct = self.header.types.UINT
assert intstruct.size == 4
data = self.fileobj.read(block.size)
types = []
typenames = []
structs = []
sdna_index_from_id = {}
offset = 8
names_len = intstruct.unpack_from(data, offset)[0]
offset += 4
self.log.debug("building #%d names" % names_len)
for _ in range(names_len):
typename = dna_io.read_data0_offset(data, offset)
offset = offset + len(typename) + 1
typenames.append(DNAName(typename))
offset = pad_up_4(offset)
offset += 4
types_len = intstruct.unpack_from(data, offset)[0]
offset += 4
self.log.debug("building #%d types" % types_len)
for _ in range(types_len):
dna_type_id = dna_io.read_data0_offset(data, offset)
types.append(DNAStruct(dna_type_id))
offset += len(dna_type_id) + 1
offset = pad_up_4(offset)
offset += 4
self.log.debug("building #%d type-lengths" % types_len)
for i in range(types_len):
typelen = shortstruct.unpack_from(data, offset)[0]
offset = offset + 2
types[i].size = typelen
offset = pad_up_4(offset)
offset += 4
structs_len = intstruct.unpack_from(data, offset)[0]
offset += 4
log.debug("building #%d structures" % structs_len)
pointer_size = self.header.pointer_size
for sdna_index in range(structs_len):
struct_type_index, fields_len = shortstruct2.unpack_from(data, offset)
offset += 4
dna_struct = types[struct_type_index]
sdna_index_from_id[dna_struct.dna_type_id] = sdna_index
structs.append(dna_struct)
dna_offset = 0
for field_index in range(fields_len):
field_type_index, field_name_index = shortstruct2.unpack_from(data, offset)
offset += 4
dna_type = types[field_type_index]
dna_name = typenames[field_name_index]
if dna_name.is_pointer or dna_name.is_method_pointer:
dna_size = pointer_size * dna_name.array_size
else:
dna_size = dna_type.size * dna_name.array_size
field = DNAField(dna_type, dna_name, dna_size, dna_offset)
dna_struct.fields.append(field)
dna_struct.field_from_name[dna_name.name_only] = field
dna_offset += dna_size
return structs, sdna_index_from_id
class BlendFileHeader:
"""
BlendFileHeader represents the first 12 bytes of a blend file.
it contains information about the hardware architecture, which is relevant
to the structure of the rest of the file.
"""
log = log.getChild('BlendFileHeader')
structure = struct.Struct(b'7s1s1s3s')
def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path):
self.log.debug("reading blend-file-header %s", path)
header = fileobj.read(self.structure.size)
values = self.structure.unpack(header)
self.magic = values[0]
pointer_size_id = values[1]
if pointer_size_id == b'-':
self.pointer_size = 8
elif pointer_size_id == b'_':
self.pointer_size = 4
else:
raise exceptions.BlendFileError('invalid pointer size %r' % pointer_size_id, path)
endian_id = values[2]
if endian_id == b'v':
self.types = dna_io.LittleEndianTypes
self.endian_str = b'<' # indication for struct.Struct()
elif endian_id == b'V':
self.types = dna_io.BigEndianTypes
self.endian_str = b'>' # indication for struct.Struct()
else:
raise exceptions.BlendFileError('invalid endian indicator %r' % endian_id, path)
version_id = values[3]
self.version = int(version_id)
def create_block_header_struct(self) -> struct.Struct:
"""Create a Struct instance for parsing data block headers."""
return struct.Struct(b''.join((
self.endian_str,
b'4sI',
b'I' if self.pointer_size == 4 else b'Q',
b'II',
)))
class BlendFileBlock:
"""
Instance of a struct.
"""
log = log.getChild('BlendFileBlock')
old_structure = struct.Struct(b'4sI')
"""old blend files ENDB block structure"""
def __init__(self, fileobj: typing.BinaryIO, bfile: BlendFile):
self.file = bfile
# Defaults; actual values are set by interpreting the block header.
self.code = b''
self.size = 0
self.addr_old = 0
self.sdna_index = 0
self.count = 0
self.file_offset = 0
"""Offset in bytes from start of file to beginning of the data block.
Points to the data after the block header.
"""
header_struct = bfile.block_header_struct
data = fileobj.read(header_struct.size)
if len(data) != header_struct.size:
self.log.warning("Blend file %s seems to be truncated, "
"expected %d bytes but could read only %d",
header_struct.size, len(data))
self.code = b'ENDB'
return
# header size can be 8, 20, or 24 bytes long
# 8: old blend files ENDB block (exception)
# 20: normal headers 32 bit platform
# 24: normal headers 64 bit platform
if len(data) <= 15:
self.log.debug('interpreting block as old-style ENB block')
blockheader = self.old_structure.unpack(data)
self.code = dna_io.read_data0(blockheader[0])
return
blockheader = header_struct.unpack(data)
self.code = dna_io.read_data0(blockheader[0])
if self.code != b'ENDB':
self.size = blockheader[1]
self.addr_old = blockheader[2]
self.sdna_index = blockheader[3]
self.count = blockheader[4]
self.file_offset = fileobj.tell()
def __str__(self):
return "<%s.%s (%s), size=%d at %s>" % (
self.__class__.__name__,
self.dna_type_name,
self.code.decode(),
self.size,
hex(self.addr_old),
)
@property
def dna_type(self):
return self.file.structs[self.sdna_index]
@property
def dna_type_name(self):
return self.dna_type.dna_type_id.decode('ascii')
def refine_type_from_index(self, sdna_index_next):
assert (type(sdna_index_next) is int)
sdna_index_curr = self.sdna_index
self.file.ensure_subtype_smaller(sdna_index_curr, sdna_index_next)
self.sdna_index = sdna_index_next
def refine_type(self, dna_type_id):
assert (type(dna_type_id) is bytes)
self.refine_type_from_index(self.file.sdna_index_from_id[dna_type_id])
def get_file_offset(self, path,
default=...,
sdna_index_refine=None,
base_index=0,
):
"""
Return (offset, length)
"""
assert (type(path) is bytes)
ofs = self.file_offset
if base_index != 0:
assert (base_index < self.count)
ofs += (self.size // self.count) * base_index
self.file.handle.seek(ofs, os.SEEK_SET)
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
else:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
field = dna_struct.field_from_path(
self.file.header, self.file.handle, path)
return (self.file.handle.tell(), field.dna_name.array_size)
def get(self, path,
default=...,
sdna_index_refine=None,
use_nil=True, use_str=True,
base_index=0,
):
ofs = self.file_offset
if base_index != 0:
assert (base_index < self.count)
ofs += (self.size // self.count) * base_index
self.file.handle.seek(ofs, os.SEEK_SET)
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
else:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
return dna_struct.field_get(
self.file.header, self.file.handle, path,
default=default,
use_nil=use_nil, use_str=use_str,
)
def get_recursive_iter(self, path, path_root=b"",
default=...,
sdna_index_refine=None,
use_nil=True, use_str=True,
base_index=0,
):
if path_root:
path_full = (
(path_root if type(path_root) is tuple else (path_root,)) +
(path if type(path) is tuple else (path,)))
else:
path_full = path
try:
yield (path_full,
self.get(path_full, default, sdna_index_refine, use_nil, use_str, base_index))
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
struct_index = self.file.sdna_index_from_id.get(dna_type.dna_type_id, None)
if struct_index is None:
yield (path_full, "<%s>" % dna_type.dna_type_id.decode('ascii'))
else:
struct = self.file.structs[struct_index]
for f in struct.fields:
yield from self.get_recursive_iter(
f.dna_name.name_only, path_full, default, None, use_nil, use_str, 0)
def items_recursive_iter(self):
for k in self.keys():
yield from self.get_recursive_iter(k, use_str=False)
def get_data_hash(self):
"""
Generates a 'hash' that can be used instead of addr_old as block id, and that should be 'stable' across .blend
file load & save (i.e. it does not changes due to pointer addresses variations).
"""
# TODO This implementation is most likely far from optimal... and CRC32 is not renown as the best hashing
# algo either. But for now does the job!
import zlib
def _is_pointer(self, k):
return self.file.structs[self.sdna_index].field_from_path(
self.file.header, self.file.handle, k).dna_name.is_pointer
hsh = 1
for k, v in self.items_recursive_iter():
if not _is_pointer(self, k):
hsh = zlib.adler32(str(v).encode(), hsh)
return hsh
def set(self, path, value,
sdna_index_refine=None,
):
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
else:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
self.file.handle.seek(self.file_offset, os.SEEK_SET)
self.file.is_modified = True
return dna_struct.field_set(
self.file.header, self.file.handle, path, value)
# ---------------
# Utility get/set
#
# avoid inline pointer casting
def get_pointer(
self, path,
default=...,
sdna_index_refine=None,
base_index=0,
):
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
result = self.get(path, default, sdna_index_refine=sdna_index_refine, base_index=base_index)
# default
if type(result) is not int:
return result
assert (self.file.structs[sdna_index_refine].field_from_path(
self.file.header, self.file.handle, path).dna_name.is_pointer)
if result != 0:
# possible (but unlikely)
# that this fails and returns None
# maybe we want to raise some exception in this case
return self.file.find_block_from_offset(result)
else:
return None
# ----------------------
# Python convenience API
# dict like access
def __getitem__(self, item):
return self.get(item, use_str=False)
def __setitem__(self, item, value):
self.set(item, value)
def keys(self):
return (f.dna_name.name_only for f in self.dna_type.fields)
def values(self):
for k in self.keys():
try:
yield self[k]
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
yield "<%s>" % dna_type.dna_type_id.decode('ascii')
def items(self):
for k in self.keys():
try:
yield (k, self[k])
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
yield (k, "<%s>" % dna_type.dna_type_id.decode('ascii'))