Allow aborting a pack operation.

For this to work well I also had to remove the sorting of blocks in
trace.deps(). The sorting caused the first `yield` to be executed only
after each blend file was opened, which means that the consuming for-loop
takes a long time to hit its first iteration. As a result, it would respond
slowly to abort requests. By not sorting the first `yield` is much sooner,
resolving this issue.
This commit is contained in:
Sybren A. Stüvel 2018-03-16 12:06:13 +01:00
parent 4f4a67784d
commit f7150c0d29
6 changed files with 103 additions and 16 deletions

View File

@ -4,6 +4,7 @@ import functools
import logging import logging
import pathlib import pathlib
import tempfile import tempfile
import threading
import typing import typing
from blender_asset_tracer import trace, bpathlib, blendfile from blender_asset_tracer import trace, bpathlib, blendfile
@ -51,6 +52,13 @@ class AssetAction:
""" """
class Aborted(RuntimeError):
"""Raised by Packer to abort the packing process.
See the Packer.abort() function.
"""
class Packer: class Packer:
def __init__(self, def __init__(self,
bfile: pathlib.Path, bfile: pathlib.Path,
@ -61,6 +69,8 @@ class Packer:
self.project = project self.project = project
self.target = target self.target = target
self.noop = noop self.noop = noop
self._aborted = threading.Event()
self._abort_lock = threading.RLock()
# Set this to a custom Callback() subclass instance before calling # Set this to a custom Callback() subclass instance before calling
# strategise() to receive progress reports. # strategise() to receive progress reports.
@ -82,6 +92,9 @@ class Packer:
self._new_location_paths = set() # type: typing.Set[pathlib.Path] self._new_location_paths = set() # type: typing.Set[pathlib.Path]
self._output_path = None # type: pathlib.Path self._output_path = None # type: pathlib.Path
# Filled by execute()
self._file_transferer = None # type: transfer.FileTransferer
# Number of files we would copy, if not for --noop # Number of files we would copy, if not for --noop
self._file_count = 0 self._file_count = 0
@ -113,6 +126,30 @@ class Packer:
self._progress_cb = new_progress_cb self._progress_cb = new_progress_cb
self._tscb = progress.ThreadSafeCallback(self._progress_cb) self._tscb = progress.ThreadSafeCallback(self._progress_cb)
def abort(self) -> None:
"""Aborts the current packing process.
Can be called from any thread. Aborts as soon as the running strategise
or execute function gets control over the execution flow, by raising
an Aborted exception.
"""
with self._abort_lock:
if self._file_transferer:
self._file_transferer.abort()
self._aborted.set()
def _check_aborted(self) -> None:
"""Raises an Aborted exception when abort() was called."""
with self._abort_lock:
if not self._aborted.is_set():
return
log.warning('Aborting')
self._tscb.flush()
self._progress_cb.pack_aborted()
raise Aborted()
def exclude(self, *globs: str): def exclude(self, *globs: str):
"""Register glob-compatible patterns of files that should be ignored.""" """Register glob-compatible patterns of files that should be ignored."""
self._exclude_globs.update(globs) self._exclude_globs.update(globs)
@ -137,8 +174,10 @@ class Packer:
act.path_action = PathAction.KEEP_PATH act.path_action = PathAction.KEEP_PATH
act.new_path = bfile_pp act.new_path = bfile_pp
self._check_aborted()
self._new_location_paths = set() self._new_location_paths = set()
for usage in trace.deps(self.blendfile, self._progress_cb): for usage in trace.deps(self.blendfile, self._progress_cb):
self._check_aborted()
asset_path = usage.abspath asset_path = usage.abspath
if any(asset_path.match(glob) for glob in self._exclude_globs): if any(asset_path.match(glob) for glob in self._exclude_globs):
log.info('Excluding file: %s', asset_path) log.info('Excluding file: %s', asset_path)
@ -239,25 +278,32 @@ class Packer:
""" """
log.debug('Executing %d copy actions', len(self._actions)) log.debug('Executing %d copy actions', len(self._actions))
ft = self._create_file_transferer() self._file_transferer = self._create_file_transferer()
ft.progress_cb = self._tscb self._file_transferer.progress_cb = self._tscb
if not self.noop: if not self.noop:
ft.start() self._file_transferer.start()
try: try:
for asset_path, action in self._actions.items(): for asset_path, action in self._actions.items():
self._copy_asset_and_deps(asset_path, action, ft) self._check_aborted()
self._copy_asset_and_deps(asset_path, action, self._file_transferer)
if self.noop: if self.noop:
log.info('Would copy %d files to %s', self._file_count, self.target) log.info('Would copy %d files to %s', self._file_count, self.target)
return return
ft.done_and_join() self._file_transferer.done_and_join()
except KeyboardInterrupt: except KeyboardInterrupt:
log.info('File transfer interrupted with Ctrl+C, aborting.') log.info('File transfer interrupted with Ctrl+C, aborting.')
ft.abort_and_join() self._file_transferer.abort_and_join()
raise raise
finally: finally:
self._tscb.flush() self._tscb.flush()
self._check_aborted()
# Make sure that the file transferer is no longer usable, for
# example to avoid it being involved in any following call to
# self.abort().
self._file_transferer = None
def _rewrite_paths(self) -> None: def _rewrite_paths(self) -> None:
"""Rewrite paths to the new location of the assets. """Rewrite paths to the new location of the assets.
@ -268,6 +314,7 @@ class Packer:
for bfile_path, action in self._actions.items(): for bfile_path, action in self._actions.items():
if not action.rewrites: if not action.rewrites:
continue continue
self._check_aborted()
assert isinstance(bfile_path, pathlib.Path) assert isinstance(bfile_path, pathlib.Path)
# bfile_pp is the final path of this blend file in the BAT pack. # bfile_pp is the final path of this blend file in the BAT pack.
@ -292,6 +339,7 @@ class Packer:
bfile.copy_and_rebind(bfile_tp, mode='rb+') bfile.copy_and_rebind(bfile_tp, mode='rb+')
for usage in action.rewrites: for usage in action.rewrites:
self._check_aborted()
assert isinstance(usage, result.BlockUsage) assert isinstance(usage, result.BlockUsage)
asset_pp = self._actions[usage.abspath].new_path asset_pp = self._actions[usage.abspath].new_path
assert isinstance(asset_pp, pathlib.Path) assert isinstance(asset_pp, pathlib.Path)

View File

@ -23,6 +23,9 @@ class Callback(blender_asset_tracer.trace.progress.Callback):
missing_files: typing.Set[pathlib.Path]) -> None: missing_files: typing.Set[pathlib.Path]) -> None:
"""Called when packing is done.""" """Called when packing is done."""
def pack_aborted(self):
"""Called when packing was aborted."""
def trace_blendfile(self, filename: pathlib.Path) -> None: def trace_blendfile(self, filename: pathlib.Path) -> None:
"""Called for every blendfile opened when tracing dependencies.""" """Called for every blendfile opened when tracing dependencies."""

View File

@ -123,7 +123,7 @@ class S3Transferrer(transfer.FileTransferer):
return True return True
def _upload_callback(self, bytes_uploaded: int): def _upload_callback(self, bytes_uploaded: int):
if self.abort.is_set(): if self._abort.is_set():
log.warning('Interrupting ongoing upload') log.warning('Interrupting ongoing upload')
raise self.AbortUpload('interrupting ongoing upload') raise self.AbortUpload('interrupting ongoing upload')

View File

@ -50,7 +50,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
# to finish copying a file. # to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event() self.done = threading.Event()
self.abort = threading.Event() self._abort = threading.Event()
# Instantiate a dummy progress callback so that we can call it # Instantiate a dummy progress callback so that we can call it
# without checking for None all the time. # without checking for None all the time.
@ -65,14 +65,14 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): def queue_copy(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a copy action from 'src' to 'dst'.""" """Queue a copy action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
self.queue.put((src, dst, Action.COPY)) self.queue.put((src, dst, Action.COPY))
self.total_queued_bytes += src.stat().st_size self.total_queued_bytes += src.stat().st_size
def queue_move(self, src: pathlib.Path, dst: pathlib.Path): def queue_move(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a move action from 'src' to 'dst'.""" """Queue a move action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
self.queue.put((src, dst, Action.MOVE)) self.queue.put((src, dst, Action.MOVE))
self.total_queued_bytes += src.stat().st_size self.total_queued_bytes += src.stat().st_size
@ -111,10 +111,15 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
files_remaining.append(src) files_remaining.append(src)
return files_remaining return files_remaining
def abort(self) -> None:
"""Abort the file transfer, immediately returns."""
log.info('Aborting')
self._abort.set()
def abort_and_join(self) -> None: def abort_and_join(self) -> None:
"""Abort the file transfer, and wait until done.""" """Abort the file transfer, and wait until done."""
self.abort.set() self.abort()
self.join() self.join()
files_remaining = self._files_remaining() files_remaining = self._files_remaining()
@ -127,7 +132,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Generator, yield queued items until the work is done.""" """Generator, yield queued items until the work is done."""
while True: while True:
if self.abort.is_set(): if self._abort.is_set():
return return
try: try:

View File

@ -32,10 +32,7 @@ def deps(bfilepath: pathlib.Path, progress_cb: typing.Optional[progress.Callback
if progress_cb: if progress_cb:
bi.progress_cb = progress_cb bi.progress_cb = progress_cb
ahb = asset_holding_blocks(bi.iter_blocks(bfile)) for block in asset_holding_blocks(bi.iter_blocks(bfile)):
# Sort the asset-holding blocks so that we can iterate over them
# in disk order, which is slightly faster than random order.
for block in sorted(ahb):
yield from blocks2assets.iter_assets(block) yield from blocks2assets.iter_assets(block)

View File

@ -353,3 +353,37 @@ class ProgressTest(AbstractPackTest):
] ]
cb.missing_file.assert_has_calls(expected_calls, any_order=True) cb.missing_file.assert_has_calls(expected_calls, any_order=True)
self.assertEqual(len(expected_calls), cb.missing_file.call_count) self.assertEqual(len(expected_calls), cb.missing_file.call_count)
class AbortTest(AbstractPackTest):
def test_abort_strategise(self):
infile = self.blendfiles / 'subdir/doubly_linked_up.blend'
packer = pack.Packer(infile, self.blendfiles, self.tpath)
class AbortingCallback(progress.Callback):
def trace_blendfile(self, filename: pathlib.Path):
# Call abort() somewhere during the strategise() call.
if filename.name == 'linked_cube.blend':
packer.abort()
packer.progress_cb = AbortingCallback()
with packer, self.assertRaises(pack.Aborted):
packer.strategise()
def test_abort_transfer(self):
infile = self.blendfiles / 'subdir/doubly_linked_up.blend'
packer = pack.Packer(infile, self.blendfiles, self.tpath)
first_file_size = infile.stat().st_size
class AbortingCallback(progress.Callback):
def transfer_progress(self, total_bytes: int, transferred_bytes: int):
# Call abort() somewhere during the file transfer.
if total_bytes > first_file_size * 1.1:
packer.abort()
packer.progress_cb = AbortingCallback()
with packer:
packer.strategise()
with self.assertRaises(pack.Aborted):
packer.execute()