# ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** # # (c) 2018, Blender Foundation - Sybren A. Stüvel """ZIP file packer. Note: There is no official file name encoding for ZIP files. Expect trouble when you want to use the ZIP cross-platform and you have non-ASCII names. """ import datetime import io import logging import pathlib import platform import sys import typing import zipfile from . import Packer, transfer log = logging.getLogger(__name__) # Suffixes to store uncompressed in the zip. STORE_ONLY = {".jpg", ".jpeg", ".exr"} # Filename used for the embedded debug log inside produced zips. LOG_FILENAME_IN_ZIP = "bat-debug.log" class ZipPacker(Packer): """Creates a zipped BAT Pack instead of a directory. Captures the most verbose BAT log output in memory while the pack runs and embeds it as ``bat-debug.log`` inside the produced zip, so the archive is self-describing for post-mortem debugging. """ def __init__( self, bfile: typing.Union[pathlib.Path, typing.List[pathlib.Path]], project: pathlib.Path, target: str, *, noop: bool = False, compress: bool = False, relative_only: bool = False, keep_hierarchy: bool = False, ) -> None: super().__init__( bfile, project, target, noop=noop, compress=compress, relative_only=relative_only, keep_hierarchy=keep_hierarchy, ) self._log_buffer: io.StringIO = io.StringIO() self._log_handler: typing.Optional[logging.Handler] = logging.StreamHandler( self._log_buffer ) self._log_handler.setLevel(logging.DEBUG) self._log_handler.setFormatter( logging.Formatter( fmt="%(asctime)s.%(msecs)03d %(levelname)-8s %(name)-45s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) ) self._bat_logger: logging.Logger = logging.getLogger("blender_asset_tracer") self._original_logger_level: int = self._bat_logger.level try: self._bat_logger.setLevel(logging.DEBUG) self._bat_logger.addHandler(self._log_handler) except Exception: self._bat_logger.removeHandler(self._log_handler) self._bat_logger.setLevel(self._original_logger_level) self._log_handler = None raise self._log_pack_banner() def _log_pack_banner(self) -> None: """Emit a header summarising the environment for the captured log.""" try: from blender_asset_tracer import __version__ as bat_version except Exception: bat_version = "unknown" log.info("=" * 78) log.info("BAT Pack starting") log.info("BAT version : %s", bat_version) log.info( "UTC timestamp : %s", datetime.datetime.now(datetime.timezone.utc).isoformat(), ) log.info("Blend files : %s", self.blendfiles) log.info("Project root : %s", self.project) log.info("Target : %s", self.target) log.info("noop : %s", self.noop) log.info("relative_only : %s", self.relative_only) log.info("keep_hierarchy : %s", self.keep_hierarchy) log.info("sys.argv : %s", sys.argv) log.info("Python : %s", sys.version.replace("\n", " ")) log.info("Platform : %s", platform.platform()) log.info("=" * 78) def _create_file_transferer(self) -> transfer.FileTransferer: target_path = pathlib.Path(self._target_path) return ZipTransferrer(target_path.absolute()) def close(self) -> None: """Flush the captured debug log into the zip, then run base cleanup.""" if self._log_handler is None: super().close() return log.info("BAT Pack closing; flushing debug log to %s", LOG_FILENAME_IN_ZIP) self._bat_logger.removeHandler(self._log_handler) self._bat_logger.setLevel(self._original_logger_level) handler = self._log_handler self._log_handler = None try: handler.flush() log_content = self._log_buffer.getvalue() zippath = pathlib.Path(self.target) if not self.noop and zippath.exists(): with zipfile.ZipFile(str(zippath), "a") as outzip: outzip.writestr(LOG_FILENAME_IN_ZIP, log_content) except Exception: log.exception("Failed to embed debug log into %s", self.target) super().close() class ZipTransferrer(transfer.FileTransferer): """Creates a ZIP file instead of writing to a directory. Note: There is no official file name encoding for ZIP files. If you have unicode file names, they will be encoded as UTF-8. WinZip interprets all file names as encoded in CP437, also known as DOS Latin. """ def __init__(self, zippath: pathlib.Path) -> None: super().__init__() self.zippath = zippath def run(self) -> None: import zipfile zippath = self.zippath.absolute() log.info("Writing ZIP file to %s", zippath) with zipfile.ZipFile(str(zippath), "w") as outzip: for src, dst, act in self.iter_queue(): assert src.is_absolute(), "expecting only absolute paths, not %r" % src dst = pathlib.Path(dst).absolute() try: relpath = dst.relative_to(zippath) # Don't bother trying to compress already-compressed files. if src.suffix.lower() in STORE_ONLY: compression = zipfile.ZIP_STORED log.debug("ZIP %s -> %s (uncompressed)", src, relpath) else: compression = zipfile.ZIP_DEFLATED log.debug("ZIP %s -> %s", src, relpath) outzip.write( str(src), arcname=str(relpath), compress_type=compression ) if act == transfer.Action.MOVE: self.delete_file(src) except Exception: # We have to catch exceptions in a broad way, as this is running in # a separate thread, and exceptions won't otherwise be seen. log.exception("Error writing %s to ZIP archive at %s", src, dst) # Put the files to copy back into the queue, and abort. This allows # the main thread to inspect the queue and see which files were not # copied. The one we just failed (due to this exception) should also # be reported there. self.queue.put((src, dst, act)) return log.info("Finished writing ZIP file: %s", zippath)