From 15ca2f98ab10c9a082ca3a39f418e96ec0678328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 9 Mar 2018 16:30:46 +0100 Subject: [PATCH] Better handling of KeyboardInterrupt to abort file transfers --- blender_asset_tracer/pack/__init__.py | 19 +++++++++----- blender_asset_tracer/pack/transfer.py | 38 ++++++++++++++++++++------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/blender_asset_tracer/pack/__init__.py b/blender_asset_tracer/pack/__init__.py index 640d025..b5c6717 100644 --- a/blender_asset_tracer/pack/__init__.py +++ b/blender_asset_tracer/pack/__init__.py @@ -7,7 +7,6 @@ import tempfile import typing from blender_asset_tracer import trace, bpathlib, blendfile -from blender_asset_tracer.cli import common from blender_asset_tracer.trace import result from . import queued_copy, transfer @@ -63,6 +62,7 @@ class Packer: self.target = target self.noop = noop + from blender_asset_tracer.cli import common self._shorten = functools.partial(common.shorten, self.project) if noop: @@ -184,13 +184,18 @@ class Packer: if not self.noop: ft.start() - for asset_path, action in self._actions.items(): - self._copy_asset_and_deps(asset_path, action, ft) + try: + for asset_path, action in self._actions.items(): + self._copy_asset_and_deps(asset_path, action, ft) - if self.noop: - log.info('Would copy %d files to %s', self._file_count, self.target) - return - ft.done_and_join() + if self.noop: + log.info('Would copy %d files to %s', self._file_count, self.target) + return + ft.done_and_join() + except KeyboardInterrupt: + log.info('File transfer interrupted with Ctrl+C, aborting.') + ft.abort_and_join() + raise def _rewrite_paths(self) -> None: """Rewrite paths to the new location of the assets. diff --git a/blender_asset_tracer/pack/transfer.py b/blender_asset_tracer/pack/transfer.py index d5e96b7..5a3b4e0 100644 --- a/blender_asset_tracer/pack/transfer.py +++ b/blender_asset_tracer/pack/transfer.py @@ -1,10 +1,13 @@ import abc import enum +import logging import pathlib import queue import threading import typing +log = logging.getLogger(__name__) + class FileTransferError(IOError): """Raised when one or more files could not be transferred.""" @@ -25,12 +28,6 @@ QueueItem = typing.Tuple[pathlib.Path, pathlib.Path, Action] class FileTransferer(metaclass=abc.ABCMeta): """Interface for file transfer classes.""" - class Empty(queue.Empty): - """No more files to transfer, but more may be queued later.""" - - class Done(queue.Empty): - """No more files to transfer, work is done.""" - def __init__(self) -> None: super().__init__() @@ -45,13 +42,18 @@ class FileTransferer(metaclass=abc.ABCMeta): # to finish copying a file. self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.done = threading.Event() + self.abort = threading.Event() def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): """Queue a copy action from 'src' to 'dst'.""" + assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' + assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' self.queue.put((src, dst, Action.COPY)) def queue_move(self, src: pathlib.Path, dst: pathlib.Path): """Queue a move action from 'src' to 'dst'.""" + assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' + assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called' self.queue.put((src, dst, Action.MOVE)) def done_and_join(self) -> None: @@ -69,15 +71,31 @@ class FileTransferer(metaclass=abc.ABCMeta): if not self.queue.empty(): # Flush the queue so that we can report which files weren't copied yet. - files_remaining = [] - while not self.queue.empty(): - src, dst, act = self.queue.get_nowait() - files_remaining.append(src) + files_remaining = self._files_remaining() assert files_remaining raise FileTransferError( "%d files couldn't be transferred" % len(files_remaining), files_remaining) + def _files_remaining(self) -> typing.List[pathlib.Path]: + """Source files that were queued but not transferred.""" + files_remaining = [] + while not self.queue.empty(): + src, dst, act = self.queue.get_nowait() + files_remaining.append(src) + return files_remaining + + def abort_and_join(self) -> None: + """Abort the file transfer, and wait until done.""" + + self.abort.set() + self.join() + + files_remaining = self._files_remaining() + if not files_remaining: + return + log.warning("%d files couldn't be transferred, starting with %s", + len(files_remaining), files_remaining[0]) def iter_queue(self) -> typing.Iterable[QueueItem]: """Generator, yield queued items until the work is done."""