From f7150c0d29685c48d3b254fe35810d4b8d53d5d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 16 Mar 2018 12:06:13 +0100 Subject: [PATCH] Allow aborting a pack operation. For this to work well I also had to remove the sorting of blocks in trace.deps(). The sorting caused the first `yield` to be executed only after each blend file was opened, which means that the consuming for-loop takes a long time to hit its first iteration. As a result, it would respond slowly to abort requests. By not sorting the first `yield` is much sooner, resolving this issue. --- blender_asset_tracer/pack/__init__.py | 60 +++++++++++++++++++++++--- blender_asset_tracer/pack/progress.py | 3 ++ blender_asset_tracer/pack/s3.py | 2 +- blender_asset_tracer/pack/transfer.py | 15 ++++--- blender_asset_tracer/trace/__init__.py | 5 +-- tests/test_pack.py | 34 +++++++++++++++ 6 files changed, 103 insertions(+), 16 deletions(-) diff --git a/blender_asset_tracer/pack/__init__.py b/blender_asset_tracer/pack/__init__.py index 5c65846..4cbf234 100644 --- a/blender_asset_tracer/pack/__init__.py +++ b/blender_asset_tracer/pack/__init__.py @@ -4,6 +4,7 @@ import functools import logging import pathlib import tempfile +import threading import typing from blender_asset_tracer import trace, bpathlib, blendfile @@ -51,6 +52,13 @@ class AssetAction: """ +class Aborted(RuntimeError): + """Raised by Packer to abort the packing process. + + See the Packer.abort() function. + """ + + class Packer: def __init__(self, bfile: pathlib.Path, @@ -61,6 +69,8 @@ class Packer: self.project = project self.target = target self.noop = noop + self._aborted = threading.Event() + self._abort_lock = threading.RLock() # Set this to a custom Callback() subclass instance before calling # strategise() to receive progress reports. @@ -82,6 +92,9 @@ class Packer: self._new_location_paths = set() # type: typing.Set[pathlib.Path] self._output_path = None # type: pathlib.Path + # Filled by execute() + self._file_transferer = None # type: transfer.FileTransferer + # Number of files we would copy, if not for --noop self._file_count = 0 @@ -113,6 +126,30 @@ class Packer: self._progress_cb = new_progress_cb self._tscb = progress.ThreadSafeCallback(self._progress_cb) + def abort(self) -> None: + """Aborts the current packing process. + + Can be called from any thread. Aborts as soon as the running strategise + or execute function gets control over the execution flow, by raising + an Aborted exception. + """ + with self._abort_lock: + if self._file_transferer: + self._file_transferer.abort() + self._aborted.set() + + def _check_aborted(self) -> None: + """Raises an Aborted exception when abort() was called.""" + + with self._abort_lock: + if not self._aborted.is_set(): + return + + log.warning('Aborting') + self._tscb.flush() + self._progress_cb.pack_aborted() + raise Aborted() + def exclude(self, *globs: str): """Register glob-compatible patterns of files that should be ignored.""" self._exclude_globs.update(globs) @@ -137,8 +174,10 @@ class Packer: act.path_action = PathAction.KEEP_PATH act.new_path = bfile_pp + self._check_aborted() self._new_location_paths = set() for usage in trace.deps(self.blendfile, self._progress_cb): + self._check_aborted() asset_path = usage.abspath if any(asset_path.match(glob) for glob in self._exclude_globs): log.info('Excluding file: %s', asset_path) @@ -239,25 +278,32 @@ class Packer: """ log.debug('Executing %d copy actions', len(self._actions)) - ft = self._create_file_transferer() - ft.progress_cb = self._tscb + self._file_transferer = self._create_file_transferer() + self._file_transferer.progress_cb = self._tscb if not self.noop: - ft.start() + self._file_transferer.start() try: for asset_path, action in self._actions.items(): - self._copy_asset_and_deps(asset_path, action, ft) + self._check_aborted() + self._copy_asset_and_deps(asset_path, action, self._file_transferer) if self.noop: log.info('Would copy %d files to %s', self._file_count, self.target) return - ft.done_and_join() + self._file_transferer.done_and_join() except KeyboardInterrupt: log.info('File transfer interrupted with Ctrl+C, aborting.') - ft.abort_and_join() + self._file_transferer.abort_and_join() raise finally: self._tscb.flush() + self._check_aborted() + + # Make sure that the file transferer is no longer usable, for + # example to avoid it being involved in any following call to + # self.abort(). + self._file_transferer = None def _rewrite_paths(self) -> None: """Rewrite paths to the new location of the assets. @@ -268,6 +314,7 @@ class Packer: for bfile_path, action in self._actions.items(): if not action.rewrites: continue + self._check_aborted() assert isinstance(bfile_path, pathlib.Path) # bfile_pp is the final path of this blend file in the BAT pack. @@ -292,6 +339,7 @@ class Packer: bfile.copy_and_rebind(bfile_tp, mode='rb+') for usage in action.rewrites: + self._check_aborted() assert isinstance(usage, result.BlockUsage) asset_pp = self._actions[usage.abspath].new_path assert isinstance(asset_pp, pathlib.Path) diff --git a/blender_asset_tracer/pack/progress.py b/blender_asset_tracer/pack/progress.py index 2f8c91b..92932f2 100644 --- a/blender_asset_tracer/pack/progress.py +++ b/blender_asset_tracer/pack/progress.py @@ -23,6 +23,9 @@ class Callback(blender_asset_tracer.trace.progress.Callback): missing_files: typing.Set[pathlib.Path]) -> None: """Called when packing is done.""" + def pack_aborted(self): + """Called when packing was aborted.""" + def trace_blendfile(self, filename: pathlib.Path) -> None: """Called for every blendfile opened when tracing dependencies.""" diff --git a/blender_asset_tracer/pack/s3.py b/blender_asset_tracer/pack/s3.py index 697011a..aa64100 100644 --- a/blender_asset_tracer/pack/s3.py +++ b/blender_asset_tracer/pack/s3.py @@ -123,7 +123,7 @@ class S3Transferrer(transfer.FileTransferer): return True def _upload_callback(self, bytes_uploaded: int): - if self.abort.is_set(): + if self._abort.is_set(): log.warning('Interrupting ongoing upload') raise self.AbortUpload('interrupting ongoing upload') diff --git a/blender_asset_tracer/pack/transfer.py b/blender_asset_tracer/pack/transfer.py index 36f3fb5..0249b20 100644 --- a/blender_asset_tracer/pack/transfer.py +++ b/blender_asset_tracer/pack/transfer.py @@ -50,7 +50,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): # to finish copying a file. self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.done = threading.Event() - self.abort = threading.Event() + self._abort = threading.Event() # Instantiate a dummy progress callback so that we can call it # without checking for None all the time. @@ -65,14 +65,14 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): """Queue a copy action from 'src' to 'dst'.""" assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' - assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' + assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' self.queue.put((src, dst, Action.COPY)) self.total_queued_bytes += src.stat().st_size def queue_move(self, src: pathlib.Path, dst: pathlib.Path): """Queue a move action from 'src' to 'dst'.""" assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' - assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' + assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' self.queue.put((src, dst, Action.MOVE)) self.total_queued_bytes += src.stat().st_size @@ -111,10 +111,15 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): files_remaining.append(src) return files_remaining + def abort(self) -> None: + """Abort the file transfer, immediately returns.""" + log.info('Aborting') + self._abort.set() + def abort_and_join(self) -> None: """Abort the file transfer, and wait until done.""" - self.abort.set() + self.abort() self.join() files_remaining = self._files_remaining() @@ -127,7 +132,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): """Generator, yield queued items until the work is done.""" while True: - if self.abort.is_set(): + if self._abort.is_set(): return try: diff --git a/blender_asset_tracer/trace/__init__.py b/blender_asset_tracer/trace/__init__.py index f28ba8d..d2dc341 100644 --- a/blender_asset_tracer/trace/__init__.py +++ b/blender_asset_tracer/trace/__init__.py @@ -32,10 +32,7 @@ def deps(bfilepath: pathlib.Path, progress_cb: typing.Optional[progress.Callback if progress_cb: bi.progress_cb = progress_cb - ahb = asset_holding_blocks(bi.iter_blocks(bfile)) - # Sort the asset-holding blocks so that we can iterate over them - # in disk order, which is slightly faster than random order. - for block in sorted(ahb): + for block in asset_holding_blocks(bi.iter_blocks(bfile)): yield from blocks2assets.iter_assets(block) diff --git a/tests/test_pack.py b/tests/test_pack.py index f305b22..ac40159 100644 --- a/tests/test_pack.py +++ b/tests/test_pack.py @@ -353,3 +353,37 @@ class ProgressTest(AbstractPackTest): ] cb.missing_file.assert_has_calls(expected_calls, any_order=True) self.assertEqual(len(expected_calls), cb.missing_file.call_count) + + +class AbortTest(AbstractPackTest): + def test_abort_strategise(self): + infile = self.blendfiles / 'subdir/doubly_linked_up.blend' + packer = pack.Packer(infile, self.blendfiles, self.tpath) + + class AbortingCallback(progress.Callback): + def trace_blendfile(self, filename: pathlib.Path): + # Call abort() somewhere during the strategise() call. + if filename.name == 'linked_cube.blend': + packer.abort() + + packer.progress_cb = AbortingCallback() + with packer, self.assertRaises(pack.Aborted): + packer.strategise() + + def test_abort_transfer(self): + infile = self.blendfiles / 'subdir/doubly_linked_up.blend' + packer = pack.Packer(infile, self.blendfiles, self.tpath) + + first_file_size = infile.stat().st_size + + class AbortingCallback(progress.Callback): + def transfer_progress(self, total_bytes: int, transferred_bytes: int): + # Call abort() somewhere during the file transfer. + if total_bytes > first_file_size * 1.1: + packer.abort() + + packer.progress_cb = AbortingCallback() + with packer: + packer.strategise() + with self.assertRaises(pack.Aborted): + packer.execute()