diff --git a/.gitignore b/.gitignore index 7f0097a..ee360fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.venv *.pyc *.blend[1-9] __pycache__ diff --git a/blender_asset_tracer/__init__.py b/blender_asset_tracer/__init__.py index 6d27b73..9b60cef 100644 --- a/blender_asset_tracer/__init__.py +++ b/blender_asset_tracer/__init__.py @@ -58,18 +58,42 @@ if _HAS_BPY: classes = ( preferences.BATPreferences, + operators.BAT_SequenceFileEntry, operators.ExportBatPack, operators.BAT_OT_export_zip, + operators.BAT_OT_scan_sequence, + operators.BAT_OT_sequence_pack, ) def register(): for cls in classes: bpy.utils.register_class(cls) bpy.types.TOPBAR_MT_file_external_data.append(operators.menu_func) + bpy.types.WindowManager.bat_sequence_template = bpy.props.EnumProperty( + name="Studio Template", + description="Folder convention used to find published blend files", + items=operators.STUDIO_TEMPLATE_ITEMS, + ) + bpy.types.WindowManager.bat_sequence_task = bpy.props.EnumProperty( + name="Task", + description="Which task folder to scan (for templates that require it)", + items=operators.TASK_CHOICE_ITEMS, + ) + bpy.types.WindowManager.bat_sequence_dir = bpy.props.StringProperty( + name="Sequence Directory", + description="Root folder of the sequence last scanned for published blend files", + ) + bpy.types.WindowManager.bat_sequence_files = bpy.props.CollectionProperty( + type=operators.BAT_SequenceFileEntry, + ) def unregister(): bpy.types.TOPBAR_MT_file_external_data.remove(operators.menu_func) - for cls in classes: + del bpy.types.WindowManager.bat_sequence_files + del bpy.types.WindowManager.bat_sequence_dir + del bpy.types.WindowManager.bat_sequence_task + del bpy.types.WindowManager.bat_sequence_template + for cls in reversed(classes): bpy.utils.unregister_class(cls) if __name__ == "__main__": diff --git a/blender_asset_tracer/bpathlib.py b/blender_asset_tracer/bpathlib.py index b2d7628..daa1b14 100644 --- a/blender_asset_tracer/bpathlib.py +++ b/blender_asset_tracer/bpathlib.py @@ -170,6 +170,22 @@ class BlendPath(bytes): return BlendPath(os.path.join(root, my_relpath)) +def _is_windows_path(str_path: str) -> bool: + """Check if path looks like a Windows absolute path (drive letter or UNC).""" + # Drive letter: C:/ or C:\ + if ( + len(str_path) >= 3 + and str_path[0].isalpha() + and str_path[1] == ":" + and str_path[2] in "/\\" + ): + return True + # UNC: //server/share or \\server\share + if len(str_path) >= 2 and str_path[:2] in ("//", "\\\\"): + return True + return False + + def make_absolute(path: pathlib.PurePath) -> pathlib.Path: """Make the path absolute without resolving symlinks or drive letters. @@ -180,17 +196,23 @@ def make_absolute(path: pathlib.PurePath) -> pathlib.Path: - Windows Network shares that are mapped to a drive letter are NOT resolved to their UNC notation. + On non-Windows platforms, Windows-style paths (drive letters and UNC) are + normalized without prepending the CWD, preserving them for cross-platform use. + The type of the returned path is determined by the current platform. """ str_path = path.as_posix() - if len(str_path) >= 2 and str_path[0].isalpha() and str_path[1] == ":": - # This is an absolute Windows path. It must be handled with care on non-Windows platforms. - if platform.system() != "Windows": - # Normalize the POSIX-like part of the path, but leave out the drive letter. + + if _is_windows_path(str_path) and platform.system() != "Windows": + if len(str_path) >= 2 and str_path[1] == ":": + # Drive letter path: normalize the part after X: non_drive_path = str_path[2:] normalized = os.path.normpath(non_drive_path) - # Stick the drive letter back on the normalized path. return pathlib.Path(str_path[:2] + normalized) + else: + # UNC path: normalize, preserving forward-slash style + normalized = os.path.normpath(str_path.replace("\\", "/")) + return pathlib.Path(normalized) return pathlib.Path(os.path.abspath(str_path)) diff --git a/blender_asset_tracer/cli/__init__.py b/blender_asset_tracer/cli/__init__.py index f6bcd18..5fe640c 100644 --- a/blender_asset_tracer/cli/__init__.py +++ b/blender_asset_tracer/cli/__init__.py @@ -79,6 +79,7 @@ def cli_main(): blocks.add_parser(subparsers) pack.add_parser(subparsers) + pack.add_sequence_parser(subparsers) list_deps.add_parser(subparsers) version.add_parser(subparsers) diff --git a/blender_asset_tracer/cli/pack.py b/blender_asset_tracer/cli/pack.py index 19e713e..a5e9d2b 100755 --- a/blender_asset_tracer/cli/pack.py +++ b/blender_asset_tracer/cli/pack.py @@ -19,6 +19,7 @@ # (c) 2018, Blender Foundation - Sybren A. Stüvel """Create a BAT-pack for the given blend file.""" import logging +import os import pathlib import sys import typing @@ -34,7 +35,8 @@ def add_parser(subparsers): parser = subparsers.add_parser("pack", help=__doc__) parser.set_defaults(func=cli_pack) - parser.add_argument("blendfile", type=pathlib.Path, help="The Blend file to pack.") + parser.add_argument("blendfile", nargs='?', type=pathlib.Path, default=None, + help="The Blend file to pack (omit when using --sequence).") parser.add_argument( "target", type=str, @@ -97,12 +99,130 @@ def add_parser(subparsers): "path structure under the target directory. Paths in blend files are " "rewritten to relative paths within this structure.", ) + parser.add_argument( + "--sequence", + nargs="+", + type=pathlib.Path, + metavar="BLENDFILE", + help="(Deprecated: use 'pack-sequence' subcommand instead.) " + "Pack multiple blend files together, deduplicating shared dependencies.", + ) + + +def add_sequence_parser(subparsers): + """Add argparser for the pack-sequence subcommand.""" + + parser = subparsers.add_parser( + "pack-sequence", + help="Pack multiple blend files together, deduplicating shared dependencies.", + ) + parser.set_defaults(func=cli_pack_sequence) + parser.add_argument( + "blendfiles", + nargs="+", + type=pathlib.Path, + help="Blend files to pack.", + ) + parser.add_argument( + "-t", + "--target", + type=str, + required=True, + help="Target directory or ZIP file.", + ) + parser.add_argument( + "-p", + "--project", + type=pathlib.Path, + help="Root directory of your project.", + ) + parser.add_argument( + "-n", + "--noop", + default=False, + action="store_true", + help="Don't copy files, just show what would be done.", + ) + parser.add_argument( + "-e", + "--exclude", + nargs="*", + default="", + help="Space-separated list of glob patterns to exclude.", + ) + parser.add_argument( + "-c", + "--compress", + default=False, + action="store_true", + help="Compress blend files while copying.", + ) + parser.add_argument( + "-r", + "--relative-only", + default=False, + action="store_true", + help="Only pack assets referred to with a relative path.", + ) + + +def derive_common_project(bpaths: typing.List[pathlib.Path]) -> pathlib.Path: + """Derive common project directory from multiple blend file paths. + + Raises ValueError if paths span multiple drives or if common root + is the filesystem root. + """ + if len(bpaths) == 1: + return bpaths[0].parent + + try: + ppath = pathlib.Path(os.path.commonpath([p.parent for p in bpaths])) + except ValueError: + raise ValueError( + "Blend files span multiple drives or have no common path. " + "Specify the project directory explicitly with -p/--project." + ) + + if ppath == pathlib.Path(ppath.anchor): + raise ValueError( + "Computed project path is the filesystem root (%s). " + "Specify the project directory explicitly." % ppath + ) + + return ppath + + +def cli_pack_sequence(args): + """CLI entry point for pack-sequence subcommand.""" + # Synthesize args to reuse paths_from_cli and create_packer + args.blendfile = None + args.sequence = args.blendfiles + args.keep_hierarchy = True # always for sequence packing + if not hasattr(args, 'target') or args.target is None: + log.critical("No target specified. Use -t/--target.") + sys.exit(3) + + bpaths, ppath, tpath = paths_from_cli(args) + + with create_packer(args, bpaths, ppath, tpath) as packer: + packer.strategise() + try: + packer.execute() + except blender_asset_tracer.pack.transfer.FileTransferError as ex: + log.error( + "%d files couldn't be copied, starting with %s", + len(ex.files_remaining), + ex.files_remaining[0], + ) + raise SystemExit(1) def cli_pack(args): - bpath, ppath, tpath = paths_from_cli(args) + if args.sequence: + log.warning("--sequence on 'pack' is deprecated. Use 'bat pack-sequence -t TARGET FILE...' instead.") + bpaths, ppath, tpath = paths_from_cli(args) - with create_packer(args, bpath, ppath, tpath) as packer: + with create_packer(args, bpaths, ppath, tpath) as packer: packer.strategise() try: packer.execute() @@ -116,9 +236,12 @@ def cli_pack(args): def create_packer( - args, bpath: pathlib.Path, ppath: pathlib.Path, target: str + args, bpaths: typing.List[pathlib.Path], ppath: pathlib.Path, target: str ) -> pack.Packer: if target.startswith("s3:/"): + if len(bpaths) > 1: + raise ValueError("S3 uploader does not support --sequence") + if args.noop: raise ValueError("S3 uploader does not support no-op.") @@ -131,13 +254,16 @@ def create_packer( if args.keep_hierarchy: raise ValueError("S3 uploader does not support the --keep-hierarchy option") - packer = create_s3packer(bpath, ppath, pathlib.PurePosixPath(target)) + packer = create_s3packer(bpaths[0], ppath, pathlib.PurePosixPath(target)) elif ( target.startswith("shaman+http:/") or target.startswith("shaman+https:/") or target.startswith("shaman:/") ): + if len(bpaths) > 1: + raise ValueError("Shaman uploader does not support --sequence") + if args.noop: raise ValueError("Shaman uploader does not support no-op.") @@ -154,7 +280,7 @@ def create_packer( "Shaman uploader does not support the --keep-hierarchy option" ) - packer = create_shamanpacker(bpath, ppath, target) + packer = create_shamanpacker(bpaths[0], ppath, target) elif target.lower().endswith(".zip"): from blender_asset_tracer.pack import zipped @@ -163,12 +289,12 @@ def create_packer( raise ValueError("ZIP packer does not support on-the-fly compression") packer = zipped.ZipPacker( - bpath, ppath, target, noop=args.noop, relative_only=args.relative_only, + bpaths, ppath, target, noop=args.noop, relative_only=args.relative_only, keep_hierarchy=args.keep_hierarchy, ) else: packer = pack.Packer( - bpath, + bpaths, ppath, target, noop=args.noop, @@ -222,24 +348,43 @@ def create_shamanpacker( ) -def paths_from_cli(args) -> typing.Tuple[pathlib.Path, pathlib.Path, str]: - """Return paths to blendfile, project, and pack target. +def paths_from_cli(args) -> typing.Tuple[typing.List[pathlib.Path], pathlib.Path, str]: + """Return paths to blendfile(s), project, and pack target. Calls sys.exit() if anything is wrong. """ - bpath = args.blendfile - if not bpath.exists(): - log.critical("File %s does not exist", bpath) + # Collect blend files from positional and --sequence arguments. + bpaths = [] # type: typing.List[pathlib.Path] + if args.blendfile is not None: + bpaths.append(args.blendfile) + if args.sequence: + bpaths.extend(args.sequence) + if not bpaths: + log.critical("No blend file specified. Provide a positional blendfile or use --sequence.") sys.exit(3) - if bpath.is_dir(): - log.critical("%s is a directory, should be a blend file") - sys.exit(3) - bpath = bpathlib.make_absolute(bpath) + + # Deduplicate preserving order, in case the same file appears both as + # positional arg and in --sequence. + bpaths = list(dict.fromkeys(bpaths)) + + # Validate each blend file and make absolute. + for i, bpath in enumerate(bpaths): + if not bpath.exists(): + log.critical("File %s does not exist", bpath) + sys.exit(3) + if bpath.is_dir(): + log.critical("%s is a directory, should be a blend file", bpath) + sys.exit(3) + bpaths[i] = bpathlib.make_absolute(bpath) tpath = args.target if args.project is None: - ppath = bpathlib.make_absolute(bpath).parent + try: + ppath = derive_common_project(bpaths) + except ValueError as ex: + log.critical("%s", ex) + sys.exit(5) log.warning("No project path given, using %s", ppath) else: ppath = bpathlib.make_absolute(args.project) @@ -256,18 +401,20 @@ def paths_from_cli(args) -> typing.Tuple[pathlib.Path, pathlib.Path, str]: ) ppath = ppath.parent - try: - bpath.relative_to(ppath) - except ValueError: - log.critical( - "Project directory %s does not contain blend file %s", - args.project, - bpath.absolute(), - ) - sys.exit(5) + for bpath in bpaths: + try: + bpath.relative_to(ppath) + except ValueError: + log.critical( + "Project directory %s does not contain blend file %s", + ppath, + bpath, + ) + sys.exit(5) - log.info("Blend file to pack: %s", bpath) + for bpath in bpaths: + log.info("Blend file to pack: %s", bpath) log.info("Project path: %s", ppath) log.info("Pack will be created in: %s", tpath) - return bpath, ppath, tpath + return bpaths, ppath, tpath diff --git a/blender_asset_tracer/operators.py b/blender_asset_tracer/operators.py index f797c77..e45ea7a 100644 --- a/blender_asset_tracer/operators.py +++ b/blender_asset_tracer/operators.py @@ -1,4 +1,5 @@ import os +import re import sys import subprocess import tempfile @@ -6,10 +7,67 @@ import zipfile from pathlib import Path import bpy -from bpy.types import Operator +from bpy.types import Operator, PropertyGroup from bpy_extras.io_utils import ExportHelper -from blender_asset_tracer.pack import zipped +from blender_asset_tracer.pack import zipped, progress + +# Matches filenames like scene_v02.blend (single integer version only). +# Does NOT match: _v02.1.blend, _v2-1.blend, or files without _vNN suffix. +VERSION_RE = re.compile(r'_v(\d+)\.blend$', re.IGNORECASE) + + +class BlenderProgressCallback(progress.Callback): + """Progress callback that updates Blender's wm.progress cursor indicator.""" + + def __init__(self, wm): + self._wm = wm + self._assets_traced = 0 + self._total_bytes = 0 + self._transferred_bytes = 0 + self._phase = "init" + + def pack_start(self): + self._phase = "trace" + self._assets_traced = 0 + self._wm.progress_begin(0, 1000) + self._wm.progress_update(0) + print("[BAT] Tracing dependencies...") + + def trace_asset(self, filename): + self._assets_traced += 1 + # Log scale so progress bar never saturates during trace phase + # 100 assets → ~185, 1000 → ~320, 10000 → ~385 + import math + val = int(400 * (1 - 1 / (1 + math.log1p(self._assets_traced) / 10))) + self._wm.progress_update(min(val, 399)) + if self._assets_traced % 100 == 0: + print("[BAT] Traced %d assets..." % self._assets_traced) + + def transfer_file(self, src, dst): + if self._phase != "transfer": + self._phase = "transfer" + print("[BAT] Transferring files...") + + def transfer_progress(self, total_bytes, transferred_bytes): + self._total_bytes = total_bytes + self._transferred_bytes = transferred_bytes + if total_bytes > 0: + # Transfer phase: map to 400-1000 range + pct = transferred_bytes / total_bytes + val = 400 + int(pct * 600) + self._wm.progress_update(val) + + def pack_done(self, output_blendfile, missing_files): + self._wm.progress_update(1000) + self._wm.progress_end() + print("[BAT] Pack complete! (%d assets traced)" % self._assets_traced) + if missing_files: + print("[BAT] Warning: %d missing files" % len(missing_files)) + + def pack_aborted(self, reason): + self._wm.progress_end() + print("[BAT] Pack aborted: %s" % reason) class ExportBatPack(Operator, ExportHelper): @@ -25,17 +83,29 @@ class ExportBatPack(Operator, ExportHelper): outfname = bpy.path.ensure_ext(self.filepath, ".zip") self.report({"INFO"}, "Executing ZipPacker ...") - with zipped.ZipPacker( - Path(bpy.data.filepath), - Path(bpy.data.filepath).parent, - str(self.filepath), - ) as packer: - print("[BAT] Strategising (tracing dependencies)...") - packer.strategise() - print(f"[BAT] Found {len(packer._actions)} assets to process") - print("[BAT] Executing (rewriting paths and copying files)...") - packer.execute() - print("[BAT] Packing complete!") + wm = context.window_manager + progress_cb = BlenderProgressCallback(wm) + + try: + with zipped.ZipPacker( + [Path(bpy.data.filepath)], + Path(bpy.data.filepath).parent, + str(self.filepath), + ) as packer: + packer.progress_cb = progress_cb + packer.strategise() + packer.execute() + except Exception as ex: + import traceback + traceback.print_exc() + self.report({"ERROR"}, "Packing failed: %s" % str(ex)) + return {"CANCELLED"} + finally: + try: + wm.progress_end() + except Exception: + pass + self.report({"INFO"}, "Packing successful!") with zipfile.ZipFile(str(self.filepath)) as inzip: @@ -126,13 +196,24 @@ class BAT_OT_export_zip(Operator, ExportHelper): self.report({"INFO"}, "Packing with hierarchy...") - with packer_cls(bfile, project, target, keep_hierarchy=True) as packer: - print("[BAT] Strategising (tracing dependencies)...") - packer.strategise() - print(f"[BAT] Found {len(packer._actions)} assets to process") - print("[BAT] Executing (rewriting paths and copying files)...") - packer.execute() - print("[BAT] Packing complete!") + wm = context.window_manager + progress_cb = BlenderProgressCallback(wm) + + try: + with packer_cls([bfile], project, target, keep_hierarchy=True) as packer: + packer.progress_cb = progress_cb + packer.strategise() + packer.execute() + except Exception as ex: + import traceback + traceback.print_exc() + self.report({"ERROR"}, "Packing failed: %s" % str(ex)) + return {"CANCELLED"} + finally: + try: + wm.progress_end() + except Exception: + pass if self.use_zip: with zipfile.ZipFile(target) as inzip: @@ -149,11 +230,271 @@ class BAT_OT_export_zip(Operator, ExportHelper): return {"FINISHED"} +class BAT_SequenceFileEntry(PropertyGroup): + filepath: bpy.props.StringProperty(name="File Path") + enabled: bpy.props.BoolProperty(name="Enabled", default=True) + shot_name: bpy.props.StringProperty(name="Shot") + + +# Studio templates define where to find published blend files. +# Each template is a list of path segments (case-insensitive) to walk from each +# shot directory to the folder containing versioned .blend files. +STUDIO_TEMPLATES = { + 'ADM': { + 'label': "Autour de Minuit", + 'description': "Scans // for the latest versioned .blend (e.g. lighting3d, anim3d)", + 'path_segments': ["{task}"], + 'task_choices': [ + ('lighting3d', "Lighting 3D", ""), + ('anim3d', "Animation 3D", ""), + ('layout3d', "Layout 3D", ""), + ('compositing', "Compositing", ""), + ], + }, + 'LCPROD': { + 'label': "La Cabane Productions - Lamb Stew", + 'description': "Scans /03_ANIMATION/Publish/ for the latest versioned .blend", + 'path_segments': ["03_ANIMATION", "Publish"], + }, +} + +STUDIO_TEMPLATE_ITEMS = [ + (key, tpl['label'], tpl['description']) + for key, tpl in STUDIO_TEMPLATES.items() +] + +# Collect all task choices across templates that have them. +TASK_CHOICE_ITEMS = [] +_seen = set() +for _tpl in STUDIO_TEMPLATES.values(): + for item in _tpl.get('task_choices', []): + if item[0] not in _seen: + TASK_CHOICE_ITEMS.append(item) + _seen.add(item[0]) +if not TASK_CHOICE_ITEMS: + TASK_CHOICE_ITEMS.append(('NONE', "None", "No task filter")) + + +def _find_subdir_ci(parent, name): + """Find a child directory matching `name` exactly (case-insensitive).""" + name_upper = name.upper() + for child in parent.iterdir(): + if child.is_dir() and child.name.upper() == name_upper: + return child + return None + + +def find_latest_publishes(root_dir, template_key, task=''): + """Scan a sequence folder for the latest .blend in each shot using the given template. + + Returns (results, errors) where results is a list of (shot_name, filepath) tuples + and errors is a list of (shot_name, error_message) tuples for shots that could not + be scanned. + """ + template = STUDIO_TEMPLATES.get(template_key) + if not template: + return [], [] + + results = [] + errors = [] + root = Path(root_dir) + if not root.is_dir(): + return results, errors + + path_segments = [ + seg.replace("{task}", task) if "{task}" in seg and task else seg + for seg in template['path_segments'] + ] + + for shot_dir in sorted(root.iterdir()): + if not shot_dir.is_dir(): + continue + + try: + # Walk the template path segments from the shot directory + current = shot_dir + for segment in path_segments: + current = _find_subdir_ci(current, segment) + if current is None: + break + if current is None: + continue + + # Find the .blend with the highest _vNNN version number + best_version = -1 + best_file = None + for f in current.iterdir(): + if f.suffix.lower() != '.blend': + continue + m = VERSION_RE.search(f.name) + if m: + ver = int(m.group(1)) + if ver > best_version: + best_version = ver + best_file = f + if best_file: + results.append((shot_dir.name, best_file.absolute())) + except OSError as ex: + errors.append((shot_dir.name, str(ex))) + + return results, errors + + +class BAT_OT_scan_sequence(Operator): + """Scan the current file browser directory for latest published blend files""" + + bl_idname = "bat.scan_sequence" + bl_label = "Scan Current Folder" + + def execute(self, context): + wm = context.window_manager + + # Read the directory the user is currently browsing in the file selector + try: + seq_dir = context.space_data.params.directory.decode('utf-8') + except (AttributeError, UnicodeDecodeError): + seq_dir = None + + if not seq_dir: + self.report({"ERROR"}, "Could not read current file browser directory") + return {"CANCELLED"} + + wm.bat_sequence_dir = seq_dir + wm.bat_sequence_files.clear() + publishes, scan_errors = find_latest_publishes(seq_dir, wm.bat_sequence_template, wm.bat_sequence_task) + + if scan_errors: + error_summary = "; ".join("%s: %s" % (n, e) for n, e in scan_errors) + self.report({"ERROR"}, "Failed to scan %d shot(s): %s" % (len(scan_errors), error_summary)) + return {"CANCELLED"} + + if not publishes: + self.report({"WARNING"}, "No published blend files found in %s" % seq_dir) + return {"CANCELLED"} + + for shot_name, filepath in publishes: + entry = wm.bat_sequence_files.add() + entry.shot_name = shot_name + entry.filepath = str(filepath) + entry.enabled = True + + # Update the file browser filename based on the scanned folder name + folder_name = Path(seq_dir.rstrip("/\\")).name + try: + context.space_data.params.filename = folder_name + "_bat_pack.zip" + except (AttributeError, TypeError): + pass + + self.report({"INFO"}, "Found %d published blend files" % len(publishes)) + return {"FINISHED"} + + +class BAT_OT_sequence_pack(Operator, ExportHelper): + """Pack a sequence of shots with hierarchy preservation and shared asset deduplication""" + + bl_idname = "bat.sequence_pack" + bl_label = "BAT - Pack Sequence" + filename_ext = ".zip" + + filter_glob: bpy.props.StringProperty(default="*.zip", options={'HIDDEN'}) + + def invoke(self, context, event): + wm = context.window_manager + # Default sequence_dir to blend file's parent if not already set + if not wm.bat_sequence_dir and bpy.data.is_saved: + wm.bat_sequence_dir = str(Path(bpy.data.filepath).parent) + # Pre-fill filename from the last scanned directory + if wm.bat_sequence_dir: + folder_name = Path(wm.bat_sequence_dir.rstrip("/\\")).name + self.filepath = folder_name + "_bat_pack.zip" + return super().invoke(context, event) + + def draw(self, context): + layout = self.layout + wm = context.window_manager + + layout.label(text="Studio Template:") + layout.prop(wm, "bat_sequence_template", text="") + + # Show task selector if the current template has task choices + template = STUDIO_TEMPLATES.get(wm.bat_sequence_template) + if template and template.get('task_choices'): + layout.prop(wm, "bat_sequence_task", text="Task") + + layout.separator() + layout.label(text="Navigate to the sequence folder, then:") + layout.operator(BAT_OT_scan_sequence.bl_idname, icon='FILE_REFRESH') + + if wm.bat_sequence_dir: + layout.label(text="Scanned: %s" % Path(wm.bat_sequence_dir).name) + + if len(wm.bat_sequence_files) > 0: + layout.separator() + layout.label(text="Published blend files:") + box = layout.box() + for entry in wm.bat_sequence_files: + row = box.row() + row.prop(entry, "enabled", text="") + row.label(text=entry.shot_name) + row.label(text=Path(entry.filepath).name) + + def execute(self, context): + from blender_asset_tracer.pack.zipped import ZipPacker + + wm = context.window_manager + bpaths = [Path(entry.filepath) for entry in wm.bat_sequence_files if entry.enabled] + + if not bpaths: + self.report({"ERROR"}, "No blend files selected. Scan a folder first.") + return {"CANCELLED"} + + target = bpy.path.ensure_ext(self.filepath, ".zip") + + try: + project = Path(os.path.commonpath([p.parent for p in bpaths])) + except ValueError: + self.report({"ERROR"}, + "Blend files span multiple drives. Cannot determine project root.") + return {"CANCELLED"} + if project == Path(project.anchor): + self.report({"WARNING"}, + "Project root is the filesystem root (%s). " + "Consider setting a Root directory." % project) + + self.report({"INFO"}, "Packing %d blend files..." % len(bpaths)) + + progress_cb = BlenderProgressCallback(wm) + + try: + with ZipPacker(bpaths, project, target, keep_hierarchy=True) as packer: + packer.progress_cb = progress_cb + packer.strategise() + packer.execute() + except Exception as ex: + import traceback + traceback.print_exc() + self.report({"ERROR"}, "Packing failed: %s" % str(ex)) + return {"CANCELLED"} + finally: + try: + wm.progress_end() + except Exception: + pass + + with zipfile.ZipFile(target) as inzip: + inzip.testzip() + + self.report({"INFO"}, "Written to %s" % target) + open_folder(Path(target).parent) + return {"FINISHED"} + + def menu_func(self, context): layout = self.layout layout.separator() layout.operator(ExportBatPack.bl_idname) filepath = layout.operator(BAT_OT_export_zip.bl_idname) + layout.operator(BAT_OT_sequence_pack.bl_idname) try: prefs = bpy.context.preferences.addons["blender_asset_tracer"].preferences diff --git a/blender_asset_tracer/pack/__init__.py b/blender_asset_tracer/pack/__init__.py index a63d0b4..b75f28c 100755 --- a/blender_asset_tracer/pack/__init__.py +++ b/blender_asset_tracer/pack/__init__.py @@ -96,7 +96,7 @@ class Packer: def __init__( self, - bfile: pathlib.Path, + bfile: typing.Union[pathlib.Path, typing.List[pathlib.Path]], project: pathlib.Path, target: str, *, @@ -105,7 +105,11 @@ class Packer: relative_only=False, keep_hierarchy=False, ) -> None: - self.blendfile = bfile + if isinstance(bfile, list): + self.blendfiles = bfile + else: + self.blendfiles = [bfile] + self.blendfile = self.blendfiles[0] # backward compat self.project = project self.target = target self._target_path = self._make_target_path(target) @@ -136,6 +140,7 @@ class Packer: self.missing_files = set() # type: typing.Set[pathlib.Path] self._new_location_paths = set() # type: typing.Set[pathlib.Path] self._output_path = None # type: typing.Optional[pathlib.PurePath] + self._output_paths = [] # type: typing.List[pathlib.PurePath] # Filled by execute() self._file_transferer = None # type: typing.Optional[transfer.FileTransferer] @@ -172,6 +177,11 @@ class Packer: assert self._output_path is not None return self._output_path + @property + def output_paths(self) -> typing.List[pathlib.PurePath]: + """The paths of all packed blend files in the target directory.""" + return list(self._output_paths) + @property def progress_cb(self) -> progress.Callback: return self._progress_cb @@ -235,46 +245,54 @@ class Packer: in the execute() function. """ - # The blendfile that we pack is generally not its own dependency, so - # we have to explicitly add it to the _packed_paths. - bfile_path = bpathlib.make_absolute(self.blendfile) - - # Both paths have to be resolved first, because this also translates - # network shares mapped to Windows drive letters back to their UNC - # notation. Only resolving one but not the other (which can happen - # with the abosolute() call above) can cause errors. - if self.keep_hierarchy: - bfile_pp = self._target_path / bpathlib.strip_root(bfile_path) - else: - bfile_pp = self._target_path / bfile_path.relative_to( - bpathlib.make_absolute(self.project) + if len(self.blendfiles) > 50: + log.info( + "Packing %d blend files. Peak memory may be high (~%.1f GB estimated).", + len(self.blendfiles), + len(self.blendfiles) * 0.005, ) - self._output_path = bfile_pp self._progress_cb.pack_start() - - act = self._actions[bfile_path] - act.path_action = PathAction.KEEP_PATH - act.new_path = bfile_pp - - self._check_aborted() self._new_location_paths = set() - for usage in trace.deps(self.blendfile, self._progress_cb): - self._check_aborted() - asset_path = usage.abspath - if any(asset_path.match(glob) for glob in self._exclude_globs): - log.info("Excluding file: %s", asset_path) - continue + self._output_paths = [] - if self.relative_only and not usage.asset_path.startswith(b"//"): - log.info("Skipping absolute path: %s", usage.asset_path) - continue + for bf in self.blendfiles: + bfile_path = bpathlib.make_absolute(bf) - if usage.is_sequence: - self._visit_sequence(asset_path, usage) + # Both paths have to be resolved first, because this also translates + # network shares mapped to Windows drive letters back to their UNC + # notation. Only resolving one but not the other (which can happen + # with the abosolute() call above) can cause errors. + if self.keep_hierarchy: + bfile_pp = self._target_path / bpathlib.strip_root(bfile_path) else: - self._visit_asset(asset_path, usage) + bfile_pp = self._target_path / bfile_path.relative_to( + bpathlib.make_absolute(self.project) + ) + self._output_paths.append(bfile_pp) + act = self._actions[bfile_path] + act.path_action = PathAction.KEEP_PATH + act.new_path = bfile_pp + + self._check_aborted() + for usage in trace.deps(bf, self._progress_cb): + self._check_aborted() + asset_path = usage.abspath + if any(asset_path.match(glob) for glob in self._exclude_globs): + log.info("Excluding file: %s", asset_path) + continue + + if self.relative_only and not usage.asset_path.startswith(b"//"): + log.info("Skipping absolute path: %s", usage.asset_path) + continue + + if usage.is_sequence: + self._visit_sequence(asset_path, usage) + else: + self._visit_asset(asset_path, usage) + + self._output_path = self._output_paths[0] # backward compat self._find_new_paths() self._group_rewrites() @@ -644,11 +662,29 @@ class Packer: log.debug("Writing info to %s", infopath) with infopath.open("wt", encoding="utf8") as infofile: print("This is a Blender Asset Tracer pack.", file=infofile) - print("Start by opening the following blend file:", file=infofile) - print( - " %s" % self._output_path.relative_to(self._target_path).as_posix(), - file=infofile, - ) + if len(self._output_paths) > 1: + print( + "This pack contains %d blend files:" % len(self._output_paths), + file=infofile, + ) + for op in self._output_paths: + print( + " %s" % op.relative_to(self._target_path).as_posix(), + file=infofile, + ) + print( + "Total unique assets: %d" % len(self._actions), file=infofile + ) + if self.missing_files: + print( + "Missing files: %d" % len(self.missing_files), file=infofile + ) + else: + print("Start by opening the following blend file:", file=infofile) + print( + " %s" % self._output_path.relative_to(self._target_path).as_posix(), + file=infofile, + ) self._file_transferer.queue_move(infopath, self._target_path / infoname) diff --git a/blender_asset_tracer/pack/zipped.py b/blender_asset_tracer/pack/zipped.py index be1bafa..bb685b7 100644 --- a/blender_asset_tracer/pack/zipped.py +++ b/blender_asset_tracer/pack/zipped.py @@ -58,6 +58,7 @@ class ZipTransferrer(transfer.FileTransferer): zippath = self.zippath.absolute() + log.info("Writing ZIP file to %s", zippath) with zipfile.ZipFile(str(zippath), "w") as outzip: for src, dst, act in self.iter_queue(): assert src.is_absolute(), "expecting only absolute paths, not %r" % src @@ -82,10 +83,11 @@ class ZipTransferrer(transfer.FileTransferer): except Exception: # We have to catch exceptions in a broad way, as this is running in # a separate thread, and exceptions won't otherwise be seen. - log.exception("Error transferring %s to %s", src, dst) + log.exception("Error writing %s to ZIP archive at %s", src, dst) # Put the files to copy back into the queue, and abort. This allows # the main thread to inspect the queue and see which files were not # copied. The one we just failed (due to this exception) should also # be reported there. self.queue.put((src, dst, act)) return + log.info("Finished writing ZIP file: %s", zippath) diff --git a/tests/test_sequence_pack.py b/tests/test_sequence_pack.py new file mode 100644 index 0000000..2fc75e2 --- /dev/null +++ b/tests/test_sequence_pack.py @@ -0,0 +1,298 @@ +"""Tests for the pack-sequence branch features.""" +import os +import pathlib +import platform +import re +import math +import sys +import types + +import pytest + +from blender_asset_tracer import bpathlib + +# operators.py requires bpy (Blender Python), which isn't available outside Blender. +# We extract testable pure-Python functions by mocking bpy at import time. +_mock_bpy = types.ModuleType("bpy") +_mock_bpy.types = types.ModuleType("bpy.types") +_mock_bpy.props = types.ModuleType("bpy.props") +_mock_bpy.types.Operator = type("Operator", (), {}) +_mock_bpy.types.PropertyGroup = type("PropertyGroup", (), {}) +_mock_bpy.props.StringProperty = lambda **kw: "" +_mock_bpy.props.BoolProperty = lambda **kw: False +_mock_bpy.props.EnumProperty = lambda **kw: "" +_mock_bpy.props.CollectionProperty = lambda **kw: None +sys.modules.setdefault("bpy", _mock_bpy) +sys.modules.setdefault("bpy.types", _mock_bpy.types) +sys.modules.setdefault("bpy.props", _mock_bpy.props) + +_mock_extras = types.ModuleType("bpy_extras") +_mock_io = types.ModuleType("bpy_extras.io_utils") +_mock_io.ExportHelper = type("ExportHelper", (), {}) +sys.modules.setdefault("bpy_extras", _mock_extras) +sys.modules.setdefault("bpy_extras.io_utils", _mock_io) + +from blender_asset_tracer.operators import ( + _find_subdir_ci, find_latest_publishes, VERSION_RE, +) + + +# --- Fix #1: _find_subdir_ci exact match --- + +class TestFindSubdirCI: + """Test case-insensitive directory matching.""" + + def test_exact_case_match(self, tmp_path): + + (tmp_path / "Publish").mkdir() + result = _find_subdir_ci(tmp_path, "Publish") + assert result is not None + assert result.name == "Publish" + + def test_case_insensitive_match(self, tmp_path): + + (tmp_path / "PUBLISH").mkdir() + result = _find_subdir_ci(tmp_path, "publish") + assert result is not None + assert result.name == "PUBLISH" + + def test_no_prefix_fallback(self, tmp_path): + """Prefix matches like 'Publish_old' must NOT match 'Publish'.""" + + (tmp_path / "Publish_old").mkdir() + result = _find_subdir_ci(tmp_path, "Publish") + assert result is None + + def test_exact_wins_over_prefix(self, tmp_path): + + (tmp_path / "Publish_old").mkdir() + (tmp_path / "Publish").mkdir() + result = _find_subdir_ci(tmp_path, "Publish") + assert result is not None + assert result.name == "Publish" + + def test_missing_returns_none(self, tmp_path): + + (tmp_path / "Other").mkdir() + result = _find_subdir_ci(tmp_path, "Publish") + assert result is None + + def test_ignores_files(self, tmp_path): + + (tmp_path / "Publish").touch() # file, not dir + result = _find_subdir_ci(tmp_path, "Publish") + assert result is None + + +# --- Fix #2: CLI pack-sequence parsing --- + +class TestCLISequenceParsing: + """Test the pack-sequence subcommand argument parsing.""" + + def test_pack_sequence_parses_correctly(self): + import argparse + from blender_asset_tracer.cli import pack as cli_pack + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + cli_pack.add_sequence_parser(subparsers) + + args = parser.parse_args([ + "pack-sequence", "-t", "output.zip", + "a.blend", "b.blend", "c.blend", + ]) + assert args.target == "output.zip" + assert len(args.blendfiles) == 3 + assert args.blendfiles[0] == pathlib.Path("a.blend") + + def test_pack_sequence_requires_target(self): + import argparse + from blender_asset_tracer.cli import pack as cli_pack + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + cli_pack.add_sequence_parser(subparsers) + + with pytest.raises(SystemExit): + parser.parse_args(["pack-sequence", "a.blend", "b.blend"]) + + def test_pack_sequence_requires_files(self): + import argparse + from blender_asset_tracer.cli import pack as cli_pack + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + cli_pack.add_sequence_parser(subparsers) + + with pytest.raises(SystemExit): + parser.parse_args(["pack-sequence", "-t", "output.zip"]) + + +# --- Fix #3: Cross-platform path normalization --- + +class TestCrossPlatformPaths: + """Test Windows path handling on Linux.""" + + @pytest.mark.skipif(platform.system() == 'Windows', reason='Linux-only test') + def test_make_absolute_windows_drive_no_cwd_contamination(self): + p = pathlib.PurePosixPath('C:/Users/LaCabane/Projects/shot.blend') + result = bpathlib.make_absolute(p) + result_str = str(result) + assert 'C:' in result_str + # Must NOT prepend CWD + assert not result_str.startswith('/home') + assert not result_str.startswith('/tmp') + + @pytest.mark.skipif(platform.system() == 'Windows', reason='Linux-only test') + def test_make_absolute_unc_preserved(self): + p = pathlib.PurePosixPath('//server/share/projects/shot.blend') + result = bpathlib.make_absolute(p) + result_str = str(result) + assert result_str.startswith('//') + + @pytest.mark.skipif(platform.system() == 'Windows', reason='Linux-only test') + def test_make_absolute_windows_dotdot_normalized(self): + p = pathlib.PurePosixPath('C:/Users/LaCabane/../Shared/shot.blend') + result = bpathlib.make_absolute(p) + result_str = str(result) + assert '..' not in result_str + assert 'C:' in result_str + + def test_is_windows_path_drive(self): + assert bpathlib._is_windows_path('C:/foo') + assert bpathlib._is_windows_path('D:\\bar') + + def test_is_windows_path_unc(self): + assert bpathlib._is_windows_path('//server/share') + assert bpathlib._is_windows_path('\\\\server\\share') + + def test_is_windows_path_negative(self): + assert not bpathlib._is_windows_path('/home/user') + assert not bpathlib._is_windows_path('relative/path') + + +# --- Fix #5: derive_common_project --- + +class TestDeriveCommonProject: + """Test project root derivation from blend file paths.""" + + def test_single_file(self, tmp_path): + from blender_asset_tracer.cli.pack import derive_common_project + bf = tmp_path / "project" / "shot.blend" + bf.parent.mkdir(parents=True) + bf.touch() + result = derive_common_project([bf]) + assert result == bf.parent + + def test_multiple_files_common_parent(self, tmp_path): + from blender_asset_tracer.cli.pack import derive_common_project + (tmp_path / "shots" / "sq01").mkdir(parents=True) + (tmp_path / "shots" / "sq02").mkdir(parents=True) + bf1 = tmp_path / "shots" / "sq01" / "a.blend" + bf2 = tmp_path / "shots" / "sq02" / "b.blend" + bf1.touch() + bf2.touch() + result = derive_common_project([bf1, bf2]) + assert result == tmp_path / "shots" + + @pytest.mark.skipif(platform.system() != 'Windows', reason='Windows cross-drive test') + def test_cross_drive_raises(self): + from blender_asset_tracer.cli.pack import derive_common_project + bf1 = pathlib.Path("C:/shots/a.blend") + bf2 = pathlib.Path("D:/shots/b.blend") + with pytest.raises(ValueError, match="multiple drives"): + derive_common_project([bf1, bf2]) + + +# --- Fix #6: Progress bar log scaling --- + +class TestProgressScaling: + """Test that progress bar doesn't saturate.""" + + def test_100_assets_below_200(self): + val = int(400 * (1 - 1 / (1 + math.log1p(100) / 10))) + assert val < 200 + + def test_1000_assets_below_350(self): + val = int(400 * (1 - 1 / (1 + math.log1p(1000) / 10))) + assert val < 350 + + def test_10000_assets_below_400(self): + val = int(400 * (1 - 1 / (1 + math.log1p(10000) / 10))) + assert val < 400 + + def test_monotonically_increasing(self): + prev = 0 + for n in [1, 10, 100, 1000, 10000]: + val = int(400 * (1 - 1 / (1 + math.log1p(n) / 10))) + assert val >= prev + prev = val + + +# --- Fix #7: Scan error collection --- + +class TestScanErrorCollection: + """Test that scan errors are collected, not silently skipped.""" + + def test_unreadable_shot_collected_as_error(self, tmp_path): + + # Create a root with one readable and one unreadable shot + root = tmp_path / "sequence" + root.mkdir() + (root / "shot01" / "03_ANIMATION" / "Publish").mkdir(parents=True) + blend = root / "shot01" / "03_ANIMATION" / "Publish" / "scene_v01.blend" + blend.touch() + + # Make shot02 unreadable + bad_shot = root / "shot02" + bad_shot.mkdir() + bad_shot.chmod(0o000) + + try: + publishes, errors = find_latest_publishes(str(root), 'LCPROD') + # shot01 should be found + assert len(publishes) == 1 + assert publishes[0][0] == "shot01" + # shot02 should be in errors + assert len(errors) == 1 + assert errors[0][0] == "shot02" + finally: + bad_shot.chmod(0o755) # cleanup + + def test_clean_scan_no_errors(self, tmp_path): + + root = tmp_path / "sequence" + root.mkdir() + (root / "shot01" / "03_ANIMATION" / "Publish").mkdir(parents=True) + (root / "shot01" / "03_ANIMATION" / "Publish" / "scene_v01.blend").touch() + publishes, errors = find_latest_publishes(str(root), 'LCPROD') + assert len(publishes) == 1 + assert len(errors) == 0 + + +# --- Fix #13: VERSION_RE --- + +class TestVersionRegex: + """Test version number extraction from filenames.""" + + def test_matches_standard(self): + + m = VERSION_RE.search("scene_v02.blend") + assert m is not None + assert m.group(1) == "02" + + def test_matches_case_insensitive(self): + + m = VERSION_RE.search("SCENE_V10.BLEND") + assert m is not None + assert m.group(1) == "10" + + def test_no_match_without_v(self): + + m = VERSION_RE.search("scene_02.blend") + assert m is None + + def test_no_match_dotversion(self): + + m = VERSION_RE.search("scene_v02.1.blend") + assert m is None