diff --git a/blender_asset_tracer/pack/__init__.py b/blender_asset_tracer/pack/__init__.py index 367ef91..685e9cd 100644 --- a/blender_asset_tracer/pack/__init__.py +++ b/blender_asset_tracer/pack/__init__.py @@ -2,34 +2,16 @@ import collections import enum import functools import logging -import threading import pathlib -import queue -import shutil import typing from blender_asset_tracer import tracer, bpathlib, blendfile from blender_asset_tracer.cli import common from blender_asset_tracer.tracer import result +from . import queued_copy log = logging.getLogger(__name__) -# For copying in a different process. By using a priority queue the files -# are automatically sorted alphabetically, which means we go through all files -# in a single directory at a time. This should be faster to copy than random -# access. The order isn't guaranteed, though, as we're not waiting around for -# all file paths to be known before copying starts. -file_copy_queue = queue.PriorityQueue() -file_copy_done = threading.Event() - - -class FileCopyError(IOError): - """Raised when one or more files could not be copied.""" - - def __init__(self, message, files_not_copied: typing.List[pathlib.Path]): - super().__init__(message) - self.files_not_copied = files_not_copied - class PathAction(enum.Enum): KEEP_PATH = 1 @@ -54,7 +36,6 @@ class Packer: self.target = target self.noop = noop - self._already_copied = set() self._shorten = functools.partial(common.shorten, self.project) if noop: @@ -64,6 +45,9 @@ class Packer: self._actions = collections.defaultdict(AssetAction) self._rewrites = collections.defaultdict(list) + # Number of files we would copy, if not for --noop + self._file_count = 0 + def strategise(self): """Determine what to do with the assets. @@ -158,29 +142,17 @@ class Packer: """ log.info('Executing %d copy actions', len(self._actions)) - t = threading.Thread(target=copy_queued) + fc = queued_copy.FileCopier() if not self.noop: - t.start() + fc.start() for asset_path, action in self._actions.items(): - self._copy_asset_and_deps(asset_path, action) + self._copy_asset_and_deps(asset_path, action, fc) if self.noop: - log.info('Would copy %d files to %s', len(self._already_copied), self.target) + log.info('Would copy %d files to %s', self._file_count, self.target) return - - file_copy_done.set() - t.join() - - if not file_copy_queue.empty(): - # Flush the queue so that we can report which files weren't copied yet. - files_remaining = [] - while not file_copy_queue.empty(): - src, dst = file_copy_queue.get_nowait() - files_remaining.append(src) - assert files_remaining - raise FileCopyError("%d files couldn't be copied" % len(files_remaining), - files_remaining) + fc.done_and_join() def _rewrite_paths(self): """Rewrite paths to the new location of the assets.""" @@ -229,12 +201,13 @@ class Packer: written = block.set(usage.path_full_field.name.name_only, relpath) log.info(' - written %d bytes', written) - def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction): + def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction, + fc: queued_copy.FileCopier): log.debug('Queueing copy of %s and dependencies', asset_path) # Copy the asset itself. packed_path = self._actions[asset_path].new_path - self._copy_to_target(asset_path, packed_path) + self._copy_to_target(asset_path, packed_path, fc) # Copy its sequence dependencies. for usage in action.usages: @@ -250,57 +223,14 @@ class Packer: packed_base_dir = first_pp.parent for file_path in usage.files(): packed_path = packed_base_dir / file_path.name - self._copy_to_target(file_path, packed_path) + self._copy_to_target(file_path, packed_path, fc) # Assumption: all data blocks using this asset use it the same way. break - def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path): + def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path, fc): if self.noop: print('%s → %s' % (asset_path, target)) + self._file_count += 1 return - file_copy_queue.put((asset_path, target)) - - -def copy_queued(): - my_log = log.getChild('copy_queued') - files_copied = 0 - files_skipped = 0 - - while True: - try: - src, dst = file_copy_queue.get(timeout=0.1) - except queue.Empty: - if file_copy_done.is_set(): - break - continue - - try: - if dst.exists(): - st_src = src.stat() - st_dst = dst.stat() - if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime: - my_log.info('Skipping %s; already exists', src) - files_skipped += 1 - continue - - my_log.info('Copying %s → %s', src, dst) - dst.parent.mkdir(parents=True, exist_ok=True) - # TODO(Sybren): when we target Py 3.6+, remove the str() calls. - shutil.copy(str(src), str(dst)) - files_copied += 1 - except Exception: - # We have to catch exceptions in a broad way, as this is running in - # a separate thread, and exceptions won't otherwise be seen. - my_log.exception('Error copying %s to %s', src, dst) - # Put the files to copy back into the queue, and abort. This allows - # the main thread to inspect the queue and see which files were not - # copied. The one we just failed (due to this exception) should also - # be reported there. - file_copy_queue.put((src, dst)) - return - - if files_copied: - my_log.info('Copied %d files', files_copied) - if files_skipped: - my_log.info('Skipped %d files', files_skipped) + fc.queue(asset_path, target) diff --git a/blender_asset_tracer/pack/queued_copy.py b/blender_asset_tracer/pack/queued_copy.py new file mode 100644 index 0000000..e2e6319 --- /dev/null +++ b/blender_asset_tracer/pack/queued_copy.py @@ -0,0 +1,97 @@ +import logging +import threading +import pathlib +import queue +import shutil +import typing + +log = logging.getLogger(__name__) + + +class FileCopyError(IOError): + """Raised when one or more files could not be copied.""" + + def __init__(self, message, files_not_copied: typing.List[pathlib.Path]): + super().__init__(message) + self.files_not_copied = files_not_copied + + +class FileCopier(threading.Thread): + """Copies files in directory order.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # For copying in a different process. By using a priority queue the files + # are automatically sorted alphabetically, which means we go through all files + # in a single directory at a time. This should be faster to copy than random + # access. The order isn't guaranteed, though, as we're not waiting around for + # all file paths to be known before copying starts. + + # maxsize=100 is just a guess as to a reasonable upper limit. When this limit + # is reached, the main thread will simply block while waiting for this thread + # to finish copying a file. + self.file_copy_queue = queue.PriorityQueue(maxsize=100) + self.file_copy_done = threading.Event() + + def queue(self, src: pathlib.Path, dst: pathlib.Path): + """Queue a copy action from 'src' to 'dst'.""" + self.file_copy_queue.put((src, dst)) + + def done_and_join(self): + """Indicate all files have been queued, and wait until done.""" + + self.file_copy_done.set() + self.join() + + if not self.file_copy_queue.empty(): + # Flush the queue so that we can report which files weren't copied yet. + files_remaining = [] + while not self.file_copy_queue.empty(): + src, dst = self.file_copy_queue.get_nowait() + files_remaining.append(src) + assert files_remaining + raise FileCopyError("%d files couldn't be copied" % len(files_remaining), + files_remaining) + + def run(self): + files_copied = 0 + files_skipped = 0 + + while True: + try: + src, dst = self.file_copy_queue.get(timeout=0.1) + except queue.Empty: + if self.file_copy_done.is_set(): + break + continue + + try: + if dst.exists(): + st_src = src.stat() + st_dst = dst.stat() + if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime: + log.info('Skipping %s; already exists', src) + files_skipped += 1 + continue + + log.info('Copying %s → %s', src, dst) + dst.parent.mkdir(parents=True, exist_ok=True) + # TODO(Sybren): when we target Py 3.6+, remove the str() calls. + shutil.copy(str(src), str(dst)) + files_copied += 1 + except Exception: + # We have to catch exceptions in a broad way, as this is running in + # a separate thread, and exceptions won't otherwise be seen. + log.exception('Error copying %s to %s', src, dst) + # Put the files to copy back into the queue, and abort. This allows + # the main thread to inspect the queue and see which files were not + # copied. The one we just failed (due to this exception) should also + # be reported there. + self.file_copy_queue.put((src, dst)) + return + + if files_copied: + log.info('Copied %d files', files_copied) + if files_skipped: + log.info('Skipped %d files', files_skipped)