diff --git a/blender_asset_tracer/pack/filesystem.py b/blender_asset_tracer/pack/filesystem.py index 8e5ae06..6006a03 100644 --- a/blender_asset_tracer/pack/filesystem.py +++ b/blender_asset_tracer/pack/filesystem.py @@ -18,7 +18,10 @@ # # (c) 2018, Blender Foundation - Sybren A. Stüvel import logging +import multiprocessing.pool +import os import pathlib +import queue import shutil import typing @@ -28,60 +31,151 @@ from . import transfer log = logging.getLogger(__name__) +class FileCopierPool(multiprocessing.pool.ThreadPool): + """Thread pool that doesn't immediately queue all items. + + The default (Thread)Pool class will always have a non-blocking + apply_async() function. This means that while we're looping over + the queue of files to copy, they are all popped off that queue + and turned into async tasks. That means that when there is an + error, the queue is empty and we don't have a reliable + "these files were not transferred" list. + + This class will block when all threads are in use. + """ + def __init__(self, processes=None, initializer=None, initargs=()): + if processes is None: + processes = os.cpu_count() or 1 + if processes < 1: + raise ValueError("Number of processes must be at least 1") + self._processes = processes + + super().__init__(processes, initializer, initargs) + self._taskqueue = queue.Queue(maxsize=processes) + + def _setup_queues(self): + self._inqueue = queue.Queue(maxsize=self._processes) + self._outqueue = queue.Queue(maxsize=self._processes) + self._quick_put = self._inqueue.put + self._quick_get = self._outqueue.get + + +class AbortTransfer(Exception): + """Raised when an error was detected and file transfer should be aborted.""" + + class FileCopier(transfer.FileTransferer): """Copies or moves files in source directory order.""" + # When we don't compress the files, the process is I/O bound, + # and trashing the storage by using multiple threads will + # only slow things down. + transfer_threads = 1 # type: typing.Optional[int] + def __init__(self): super().__init__() self.files_transferred = 0 self.files_skipped = 0 self.already_copied = set() - def run(self) -> None: - # (is_dir, action) - transfer_funcs = { + self.transfer_funcs = { (False, transfer.Action.COPY): self.copyfile, (True, transfer.Action.COPY): self.copytree, (False, transfer.Action.MOVE): self.move, (True, transfer.Action.MOVE): self.move, } + def run(self) -> None: + + pool = multiprocessing.pool.ThreadPool(processes=self.transfer_threads) + for src, dst, act in self.iter_queue(): try: - st_src = src.stat() # must exist, or it wouldn't be queued. - if dst.exists(): - st_dst = dst.stat() - if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime: - log.info('SKIP %s; already exists', src) - if act == transfer.Action.MOVE: - log.debug('Deleting %s', src) - src.unlink() - self.files_skipped += 1 - continue + if self._error.is_set() or self._abort.is_set(): + raise AbortTransfer() - log.info('%s %s → %s', act.name, src, dst) + if self._skip_file(src, dst, act): + continue + + # We want to do this in this thread, as it's not thread safe itself. dst.parent.mkdir(parents=True, exist_ok=True) - tfunc = transfer_funcs[src.is_dir(), act] - tfunc(src, dst) # type: ignore - except Exception: + pool.apply_async(self._thread, (src, dst, act)) + except AbortTransfer: + # either self._error or self._abort is already set. We just have to + # let the system know we didn't handle those files yet. + self.queue.put((src, dst, act), timeout=1.0) + except Exception as ex: # We have to catch exceptions in a broad way, as this is running in # a separate thread, and exceptions won't otherwise be seen. - log.exception('Error transferring %s to %s', src, dst) + if self._abort.is_set(): + log.debug('Error transferring %s to %s: %s', src, dst, ex) + else: + log.exception('Error transferring %s to %s', src, dst) + self._error.set() # Put the files to copy back into the queue, and abort. This allows # the main thread to inspect the queue and see which files were not # copied. The one we just failed (due to this exception) should also # be reported there. self.queue.put((src, dst, act), timeout=1.0) - self._error.set() break + log.debug('All transfer threads queued') + pool.close() + log.debug('Waiting for transfer threads to finish') + pool.join() + log.debug('All transfer threads finished') + if self.files_transferred: log.info('Transferred %d files', self.files_transferred) if self.files_skipped: log.info('Skipped %d files', self.files_skipped) + def _thread(self, src: pathlib.Path, dst: pathlib.Path, act: transfer.Action): + try: + tfunc = self.transfer_funcs[src.is_dir(), act] + + if self._error.is_set() or self._abort.is_set(): + raise AbortTransfer() + + log.info('%s %s → %s', act.name, src, dst) + tfunc(src, dst) + except AbortTransfer: + # either self._error or self._abort is already set. We just have to + # let the system know we didn't handle those files yet. + self.queue.put((src, dst, act), timeout=1.0) + except Exception as ex: + # We have to catch exceptions in a broad way, as this is running in + # a separate thread, and exceptions won't otherwise be seen. + if self._abort.is_set(): + log.debug('Error transferring %s to %s: %s', src, dst, ex) + else: + log.exception('Error transferring %s to %s', src, dst) + self._error.set() + # Put the files to copy back into the queue, and abort. This allows + # the main thread to inspect the queue and see which files were not + # copied. The one we just failed (due to this exception) should also + # be reported there. + self.queue.put((src, dst, act), timeout=1.0) + + def _skip_file(self, src: pathlib.Path, dst: pathlib.Path, act: transfer.Action) -> bool: + """Skip this file (return True) or not (return False).""" + st_src = src.stat() # must exist, or it wouldn't be queued. + if not dst.exists(): + return False + + st_dst = dst.stat() + if st_dst.st_size != st_src.st_size or st_dst.st_mtime < st_src.st_mtime: + return False + + log.info('SKIP %s; already exists', src) + if act == transfer.Action.MOVE: + log.debug('Deleting %s', src) + src.unlink() + self.files_skipped += 1 + return True + def _move(self, srcpath: pathlib.Path, dstpath: pathlib.Path): """Low-level file move""" shutil.move(str(srcpath), str(dstpath)) @@ -140,9 +234,15 @@ class FileCopier(transfer.FileTransferer): log.debug('SKIP %s; already copied', src) return + if self._error.is_set() or self._abort.is_set(): + raise AbortTransfer() + dst.mkdir(parents=True, exist_ok=True) errors = [] # type: typing.List[typing.Tuple[pathlib.Path, pathlib.Path, str]] for srcpath in src.iterdir(): + if self._error.is_set() or self._abort.is_set(): + raise AbortTransfer() + dstpath = dst / srcpath.name try: if srcpath.is_symlink(): @@ -188,6 +288,11 @@ class FileCopier(transfer.FileTransferer): class CompressedFileCopier(FileCopier): + # When we compress the files on the fly, the process is CPU-bound + # so we benefit greatly by multi-threading (packing a Spring scene + # lighting file took 6m30s single-threaded and 2min13 multi-threaded. + transfer_threads = None # type: typing.Optional[int] + def _move(self, srcpath: pathlib.Path, dstpath: pathlib.Path): compressor.move(srcpath, dstpath)