Better transfer error handling

This commit is contained in:
Sybren A. Stüvel 2018-06-05 11:49:55 +02:00
parent 44c4ce3a69
commit fc144138d0
3 changed files with 16 additions and 4 deletions

View File

@ -175,7 +175,9 @@ class Packer:
"""Raises an Aborted exception when abort() was called.""" """Raises an Aborted exception when abort() was called."""
with self._abort_lock: with self._abort_lock:
if not self._aborted.is_set(): if self._file_transferer is not None and self._file_transferer.has_error:
log.error('A transfer error occurred')
elif not self._aborted.is_set():
return return
log.warning('Aborting') log.warning('Aborting')

View File

@ -68,7 +68,8 @@ class FileCopier(transfer.FileTransferer):
# the main thread to inspect the queue and see which files were not # the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also # copied. The one we just failed (due to this exception) should also
# be reported there. # be reported there.
self.queue.put((src, dst, act)) self.queue.put((src, dst, act), timeout=1.0)
self._error.set()
break break
if files_transferred: if files_transferred:

View File

@ -69,7 +69,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
# to finish copying a file. # to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event() self.done = threading.Event()
self._abort = threading.Event() self._abort = threading.Event() # Indicates user-requested abort
self._error = threading.Event() # Indicates abort due to some error
# Instantiate a dummy progress callback so that we can call it # Instantiate a dummy progress callback so that we can call it
# without checking for None all the time. # without checking for None all the time.
@ -85,6 +86,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Queue a copy action from 'src' to 'dst'.""" """Queue a copy action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
if self._error.is_set():
return
self.queue.put((src, dst, Action.COPY)) self.queue.put((src, dst, Action.COPY))
self.total_queued_bytes += src.stat().st_size self.total_queued_bytes += src.stat().st_size
@ -92,6 +95,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Queue a move action from 'src' to 'dst'.""" """Queue a move action from 'src' to 'dst'."""
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
if self._error.is_set():
return
self.queue.put((src, dst, Action.MOVE)) self.queue.put((src, dst, Action.MOVE))
self.total_queued_bytes += src.stat().st_size self.total_queued_bytes += src.stat().st_size
@ -151,7 +156,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Generator, yield queued items until the work is done.""" """Generator, yield queued items until the work is done."""
while True: while True:
if self._abort.is_set(): if self._abort.is_set() or self._error.is_set():
return return
try: try:
@ -189,3 +194,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
path.unlink() path.unlink()
except IOError as ex: except IOError as ex:
log.warning('Unable to delete %s: %s', path, ex) log.warning('Unable to delete %s: %s', path, ex)
@property
def has_error(self) -> bool:
return self._error.is_set()