Better handling of KeyboardInterrupt to abort file transfers

This commit is contained in:
Sybren A. Stüvel 2018-03-09 16:30:46 +01:00
parent a2ca66a2f6
commit 15ca2f98ab
2 changed files with 40 additions and 17 deletions

View File

@ -7,7 +7,6 @@ import tempfile
import typing import typing
from blender_asset_tracer import trace, bpathlib, blendfile from blender_asset_tracer import trace, bpathlib, blendfile
from blender_asset_tracer.cli import common
from blender_asset_tracer.trace import result from blender_asset_tracer.trace import result
from . import queued_copy, transfer from . import queued_copy, transfer
@ -63,6 +62,7 @@ class Packer:
self.target = target self.target = target
self.noop = noop self.noop = noop
from blender_asset_tracer.cli import common
self._shorten = functools.partial(common.shorten, self.project) self._shorten = functools.partial(common.shorten, self.project)
if noop: if noop:
@ -184,13 +184,18 @@ class Packer:
if not self.noop: if not self.noop:
ft.start() ft.start()
for asset_path, action in self._actions.items(): try:
self._copy_asset_and_deps(asset_path, action, ft) for asset_path, action in self._actions.items():
self._copy_asset_and_deps(asset_path, action, ft)
if self.noop: if self.noop:
log.info('Would copy %d files to %s', self._file_count, self.target) log.info('Would copy %d files to %s', self._file_count, self.target)
return return
ft.done_and_join() ft.done_and_join()
except KeyboardInterrupt:
log.info('File transfer interrupted with Ctrl+C, aborting.')
ft.abort_and_join()
raise
def _rewrite_paths(self) -> None: def _rewrite_paths(self) -> None:
"""Rewrite paths to the new location of the assets. """Rewrite paths to the new location of the assets.

View File

@ -1,10 +1,13 @@
import abc import abc
import enum import enum
import logging
import pathlib import pathlib
import queue import queue
import threading import threading
import typing import typing
log = logging.getLogger(__name__)
class FileTransferError(IOError): class FileTransferError(IOError):
"""Raised when one or more files could not be transferred.""" """Raised when one or more files could not be transferred."""
@ -25,12 +28,6 @@ QueueItem = typing.Tuple[pathlib.Path, pathlib.Path, Action]
class FileTransferer(metaclass=abc.ABCMeta): class FileTransferer(metaclass=abc.ABCMeta):
"""Interface for file transfer classes.""" """Interface for file transfer classes."""
class Empty(queue.Empty):
"""No more files to transfer, but more may be queued later."""
class Done(queue.Empty):
"""No more files to transfer, work is done."""
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
@ -45,13 +42,18 @@ class FileTransferer(metaclass=abc.ABCMeta):
# to finish copying a file. # to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event() self.done = threading.Event()
self.abort = threading.Event()
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): def queue_copy(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a copy action from 'src' to 'dst'.""" """Queue a copy action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
self.queue.put((src, dst, Action.COPY)) self.queue.put((src, dst, Action.COPY))
def queue_move(self, src: pathlib.Path, dst: pathlib.Path): def queue_move(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a move action from 'src' to 'dst'.""" """Queue a move action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self.abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
self.queue.put((src, dst, Action.MOVE)) self.queue.put((src, dst, Action.MOVE))
def done_and_join(self) -> None: def done_and_join(self) -> None:
@ -69,15 +71,31 @@ class FileTransferer(metaclass=abc.ABCMeta):
if not self.queue.empty(): if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet. # Flush the queue so that we can report which files weren't copied yet.
files_remaining = [] files_remaining = self._files_remaining()
while not self.queue.empty():
src, dst, act = self.queue.get_nowait()
files_remaining.append(src)
assert files_remaining assert files_remaining
raise FileTransferError( raise FileTransferError(
"%d files couldn't be transferred" % len(files_remaining), "%d files couldn't be transferred" % len(files_remaining),
files_remaining) files_remaining)
def _files_remaining(self) -> typing.List[pathlib.Path]:
"""Source files that were queued but not transferred."""
files_remaining = []
while not self.queue.empty():
src, dst, act = self.queue.get_nowait()
files_remaining.append(src)
return files_remaining
def abort_and_join(self) -> None:
"""Abort the file transfer, and wait until done."""
self.abort.set()
self.join()
files_remaining = self._files_remaining()
if not files_remaining:
return
log.warning("%d files couldn't be transferred, starting with %s",
len(files_remaining), files_remaining[0])
def iter_queue(self) -> typing.Iterable[QueueItem]: def iter_queue(self) -> typing.Iterable[QueueItem]:
"""Generator, yield queued items until the work is done.""" """Generator, yield queued items until the work is done."""