Support for modifying string/bytes fields.

This commit is contained in:
Sybren A. Stüvel 2018-02-23 13:47:06 +01:00
parent efc60f437b
commit b42a090f32
8 changed files with 143 additions and 45 deletions

View File

@ -63,6 +63,7 @@ class BlendFile:
:param mode: see mode description of pathlib.Path.open()
"""
self.filepath = path
self._is_modified = False
fileobj = path.open(mode, buffering=FILE_BUFFER_SIZE)
magic = fileobj.read(len(BLENDFILE_MAGIC))
@ -136,6 +137,15 @@ class BlendFile:
def __exit__(self, exctype, excvalue, traceback):
self.close()
@property
def is_modified(self) -> bool:
return self._is_modified
def mark_modified(self):
"""Recompess the file when it is closed."""
self.log.debug('Marking %s as modified', self.raw_filepath)
self._is_modified = True
def find_blocks_from_code(self, code: bytes) -> typing.List['BlendFileBlock']:
assert isinstance(code, bytes)
return self.code_index[code]
@ -151,10 +161,27 @@ class BlendFile:
def close(self):
"""Close the blend file.
Writes the blend file to disk if it was changed.
Recompresses the blend file if it was compressed and changed.
"""
if self.fileobj:
self.fileobj.close()
if not self.fileobj:
return
if self._is_modified and self.is_compressed:
log.debug("recompressing modified blend file %s", self.raw_filepath)
self.fileobj.seek(os.SEEK_SET, 0)
with gzip.open(self.filepath, 'wb') as gzfile:
while True:
data = self.fileobj.read(FILE_BUFFER_SIZE)
if not data:
break
gzfile.write(data)
log.debug("compressing to %s finished", self.filepath)
# Close the file object after recompressing, as it may be a temporary
# file that'll disappear as soon as we close it.
self.fileobj.close()
self._is_modified = False
def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next):
# never refine to a smaller type
@ -443,8 +470,8 @@ class BlendFileBlock:
def set(self, path: dna.FieldPath, value): # TODO(Sybren): port to BAT
dna_struct = self.bfile.structs[self.sdna_index]
self.bfile.mark_modified()
self.bfile.fileobj.seek(self.file_offset, os.SEEK_SET)
self.bfile.is_modified = True
return dna_struct.field_set(self.bfile.header, self.bfile.fileobj, path, value)
def get_pointer(

View File

@ -213,17 +213,17 @@ class Struct:
dna_type = field.dna_type
dna_name = field.name
types = file_header.endian
endian = file_header.endian
# Some special cases (pointers, strings/bytes)
if dna_name.is_pointer:
return types.read_pointer(fileobj, file_header.pointer_size)
return endian.read_pointer(fileobj, file_header.pointer_size)
if dna_type.dna_type_id == b'char':
if field.size == 1:
# Single char, assume it's bitflag or int value, and not a string/bytes data...
return types.read_char(fileobj)
return endian.read_char(fileobj)
if null_terminated or (null_terminated is None and as_str):
data = types.read_bytes0(fileobj, dna_name.array_size)
data = endian.read_bytes0(fileobj, dna_name.array_size)
else:
data = fileobj.read(dna_name.array_size)
@ -232,10 +232,10 @@ class Struct:
return data
simple_readers = {
b'int': types.read_int,
b'short': types.read_short,
b'uint64_t': types.read_ulong,
b'float': types.read_float,
b'int': endian.read_int,
b'short': endian.read_short,
b'uint64_t': endian.read_ulong,
b'float': endian.read_float,
}
try:
simple_reader = simple_readers[dna_type.dna_type_id]
@ -255,22 +255,33 @@ class Struct:
return [simple_reader(fileobj) for _ in range(dna_name.array_size)]
return simple_reader(fileobj)
def field_set(self, header, handle, path, value):
def field_set(self,
file_header: header.BlendFileHeader,
fileobj: typing.BinaryIO,
path: bytes,
value: typing.Any):
"""Write a value to the blend file.
Assumes the file pointer of `fileobj` is seek()ed to the start of the
struct on disk (e.g. the start of the BlendFileBlock containing the
data).
"""
assert (type(path) == bytes)
field = self.field_from_path(header, handle, path)
if field is None:
raise KeyError("%r not found in %r" %
(path, [f.dna_name.name_only for f in self.fields]))
field, offset = self.field_from_path(file_header.pointer_size, path)
dna_type = field.dna_type
dna_name = field.dna_name
dna_name = field.name
endian = file_header.endian
if dna_type.dna_type_id == b'char':
if type(value) is str:
return DNA_IO.write_string(handle, value, dna_name.array_size)
else:
return DNA_IO.write_bytes(handle, value, dna_name.array_size)
if dna_type.dna_type_id != b'char':
msg = "Setting type %r is not supported for %s.%s" % (
dna_type, self.dna_type_id.decode(), dna_name.name_full.decode())
raise exceptions.NoWriterImplemented(msg, dna_name, dna_type)
fileobj.seek(offset, os.SEEK_CUR)
if isinstance(value, str):
return endian.write_string(fileobj, value, dna_name.array_size)
else:
raise NotImplementedError("Setting %r is not yet supported for %r" %
(dna_type, dna_name), dna_name, dna_type)
return endian.write_bytes(fileobj, value, dna_name.array_size)

View File

@ -61,20 +61,33 @@ class EndianIO:
raise ValueError('unsupported pointer size %d' % pointer_size)
@classmethod
def write_string(cls, fileobj: typing.BinaryIO, astring: str, fieldlen: int):
def write_string(cls, fileobj: typing.BinaryIO, astring: str, fieldlen: int) -> int:
"""Write a (truncated) string as UTF-8.
The string will always be written 0-terminated.
:returns: the number of bytes written.
"""
assert isinstance(astring, str)
# TODO: truncate the string on a UTF-8 character boundary to avoid creating invalid UTF-8.
cls.write_bytes(fileobj, astring.encode('utf-8'), fieldlen)
encoded = astring.encode('utf-8')[:fieldlen-1] + b'\0'
return fileobj.write(encoded)
@classmethod
def write_bytes(cls, fileobj: typing.BinaryIO, data: bytes, fieldlen: int):
def write_bytes(cls, fileobj: typing.BinaryIO, data: bytes, fieldlen: int) -> int:
"""Write (truncated) bytes.
When len(data) < fieldlen, a terminating b'\0' will be appended.
:returns: the number of bytes written.
"""
assert isinstance(data, (bytes, bytearray))
if len(data) >= fieldlen:
to_write = data[0:fieldlen]
else:
to_write = data + b'\0'
fileobj.write(to_write)
return fileobj.write(to_write)
@classmethod
def read_bytes0(cls, fileobj, length):

View File

@ -54,6 +54,19 @@ class NoReaderImplemented(NotImplementedError):
self.dna_type = dna_type
class NoWriterImplemented(NotImplementedError):
"""Raised when writing a property of a non-implemented type.
:type dna_name: blender_asset_tracer.blendfile.dna.Name
:type dna_type: blender_asset_tracer.blendfile.dna.Struct
"""
def __init__(self, message: str, dna_name, dna_type):
super().__init__(message)
self.dna_name = dna_name
self.dna_type = dna_type
class SegmentationFault(Exception):
"""Raised when a pointer to a non-existant datablock was dereferenced."""

15
tests/abstract_test.py Normal file
View File

@ -0,0 +1,15 @@
import pathlib
import unittest
class AbstractBlendFileTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.blendfiles = pathlib.Path(__file__).with_name('blendfiles')
def setUp(self):
self.bf = None
def tearDown(self):
if self.bf:
self.bf.close()

Binary file not shown.

View File

@ -1,23 +1,8 @@
import pathlib
import unittest
import os
from blender_asset_tracer import blendfile
from blender_asset_tracer.blendfile import iterators
class AbstractBlendFileTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.blendfiles = pathlib.Path(__file__).with_name('blendfiles')
def setUp(self):
self.bf = None
def tearDown(self):
if self.bf:
self.bf.close()
from abstract_test import AbstractBlendFileTest
class BlendFileBlockTest(AbstractBlendFileTest):

View File

@ -0,0 +1,34 @@
from shutil import copyfile
from blender_asset_tracer import blendfile
from abstract_test import AbstractBlendFileTest
class ModifyUncompressedTest(AbstractBlendFileTest):
def setUp(self):
self.orig = self.blendfiles / 'linked_cube.blend'
self.to_modify = self.orig.with_name('linked_cube_modified.blend')
copyfile(str(self.orig), str(self.to_modify)) # TODO: when requiring Python 3.6+, remove str()
self.bf = blendfile.BlendFile(self.to_modify, mode='r+b')
self.assertFalse(self.bf.is_compressed)
def tearDown(self):
if self.to_modify.exists():
self.to_modify.unlink()
def test_change_path(self):
library = self.bf.code_index[b'LI'][0]
# Change it from absolute to relative.
library[b'filepath'] = b'//basic_file.blend'
library[b'name'] = b'//basic_file.blend'
# Reload the blend file to inspect that it was written properly.
self.bf.close()
self.bf = blendfile.BlendFile(self.to_modify, mode='r+b')
library = self.bf.code_index[b'LI'][0]
self.assertEqual(b'//basic_file.blend', library[b'filepath'])
self.assertEqual(b'//basic_file.blend', library[b'name'])