This repository has been archived on 2026-05-04. You can view files and clone it, but cannot push or open issues or pull requests.
blender_asset_tracer/blendfile/magic_compression.py
Joseph HENRY c298e448b0 Harden BAT against missing/broken libraries and zstandard failures
- ExportBatPack and BAT_OT_export_zip: catch EnvironmentError (typically
  raised when zstandard is missing or the installed wheel is incompatible
  with the embedded Python) and any other exception, and report it via
  self.report instead of crashing the operator.
- file2blocks.BlockIterator: skip libraries that fail to open with a
  warning instead of aborting the whole trace, so production blends with
  one stale linked library still pack.
- preferences: use __package__ (stable, full dotted path) for the
  AddonPreferences bl_idname instead of __name__.split('.')[0], which
  breaks when the add-on is loaded under an unexpected module name.
- magic_compression: zstandard handling tweaks alongside the operator
  error reports above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:24:32 +02:00

186 lines
5.8 KiB
Python
Executable File

# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2021, Blender Foundation
import collections
import enum
import gzip
import logging
import os
import pathlib
import tempfile
import typing
# Blender 3.0 replaces GZip with ZStandard compression.
# Since this is not a standard library package, be careful importing it and
# treat it as optional.
try:
import zstandard
has_zstandard = True
except ImportError:
has_zstandard = False
from . import exceptions
# Magic numbers, see https://en.wikipedia.org/wiki/List_of_file_signatures
BLENDFILE_MAGIC = b"BLENDER"
GZIP_MAGIC = b"\x1f\x8b"
# ZStandard has two magic numbers, the 2nd of which doesn't use the last nibble.
# See https://tools.ietf.org/id/draft-kucherawy-dispatch-zstd-00.html#rfc.section.2.1.1
# and https://tools.ietf.org/id/draft-kucherawy-dispatch-zstd-00.html#rfc.section.2.3
ZSTD_MAGIC = b"\x28\xB5\x2F\xFD"
ZSTD_MAGIC_SKIPPABLE = b"\x50\x2A\x4D\x18"
ZSTD_MAGIC_SKIPPABLE_MASK = b"\xF0\xFF\xFF\xFF"
log = logging.getLogger(__name__)
# @dataclasses.dataclass
DecompressedFileInfo = collections.namedtuple(
"DecompressedFileInfo", "is_compressed path fileobj"
)
# is_compressed: bool
# path: pathlib.Path
# """The path of the decompressed file, or the input path if the file is not compressed."""
# fileobj: BinaryIO
class Compression(enum.Enum):
UNRECOGNISED = -1
NONE = 0
GZIP = 1
ZSTD = 2
def open(path: pathlib.Path, mode: str, buffer_size: int) -> DecompressedFileInfo:
"""Open the file, decompressing it into a temporary file if necesssary."""
fileobj = path.open(mode, buffering=buffer_size) # typing.IO[bytes]
compression = find_compression_type(fileobj)
if compression == Compression.UNRECOGNISED:
fileobj.close()
raise exceptions.BlendFileError("File is not a blend file", path)
if compression == Compression.NONE:
return DecompressedFileInfo(
is_compressed=False,
path=path,
fileobj=fileobj,
)
log.debug("%s-compressed blendfile detected: %s", compression.name, path)
print("BAT: %s-compressed blendfile: %s" % (compression.name, path))
# Decompress to a temporary file.
tmpfile = tempfile.NamedTemporaryFile()
fileobj.seek(0, os.SEEK_SET)
_decompress_to_file(fileobj, tmpfile, mode, compression, buffer_size)
# Verify the decompressed content is a valid blend file.
tmpfile.seek(0, os.SEEK_SET)
header_bytes = tmpfile.read(12)
log.debug("Decompressed header (%s, %s): %r", compression.name, path, header_bytes)
print("BAT: Decompressed header: %r" % header_bytes)
if not header_bytes.startswith(BLENDFILE_MAGIC):
raise exceptions.BlendFileError(
"Decompressed file is not a blend file (header: %r)" % header_bytes, path
)
tmpfile.seek(0, os.SEEK_SET)
# Further interaction should be done with the uncompressed file.
fileobj.close()
return DecompressedFileInfo(
is_compressed=True,
path=pathlib.Path(tmpfile.name),
fileobj=tmpfile,
)
def _decompress_to_file(
fileobj: typing.IO[bytes],
tmpfile: typing.IO[bytes],
mode: str,
compression: Compression,
buffer_size: int,
) -> None:
"""Decompress fileobj into tmpfile."""
if compression == Compression.GZIP:
with gzip.GzipFile(fileobj=fileobj, mode=mode) as gz:
while True:
data = gz.read(buffer_size)
if not data:
break
tmpfile.write(data)
elif compression == Compression.ZSTD:
if not has_zstandard:
raise EnvironmentError(
"File is compressed with ZStandard, install the `zstandard` module to support this."
)
dctx = zstandard.ZstdDecompressor()
dctx.copy_stream(fileobj, tmpfile)
else:
raise ValueError("Unsupported compression type: %s" % compression)
tmpfile.flush()
def find_compression_type(fileobj: typing.IO[bytes]) -> Compression:
fileobj.seek(0, os.SEEK_SET)
# This assumes that all magics are not longer than "BLENDER".
magic = fileobj.read(len(BLENDFILE_MAGIC))
if _matches_magic(magic, BLENDFILE_MAGIC):
return Compression.NONE
if _matches_magic(magic, GZIP_MAGIC):
return Compression.GZIP
if _matches_magic(magic, ZSTD_MAGIC):
return Compression.ZSTD
if _matches_magic_masked(magic, ZSTD_MAGIC_SKIPPABLE, ZSTD_MAGIC_SKIPPABLE_MASK):
return Compression.ZSTD
return Compression.UNRECOGNISED
def _matches_magic_masked(value: bytes, magic: bytes, mask: bytes) -> bool:
"""Returns True only if value & mask == magic & mask (ignoring trailing bytes in value)."""
assert len(magic) == len(mask)
int_value = int.from_bytes(value[: len(magic)], "little")
int_magic = int.from_bytes(magic, "little")
int_mask = int.from_bytes(mask, "little")
return int_value & int_mask == int_magic & int_mask
def _matches_magic(value: bytes, magic: bytes) -> bool:
"""Returns True only if value == magic (ignoring trailing bytes in value)."""
return value[: len(magic)] == magic