From fc144138d0eff1d85fc6fb76e83224aa25683f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 5 Jun 2018 11:49:55 +0200 Subject: [PATCH] Better transfer error handling --- blender_asset_tracer/pack/__init__.py | 4 +++- blender_asset_tracer/pack/filesystem.py | 3 ++- blender_asset_tracer/pack/transfer.py | 13 +++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/blender_asset_tracer/pack/__init__.py b/blender_asset_tracer/pack/__init__.py index 1923996..51418f7 100644 --- a/blender_asset_tracer/pack/__init__.py +++ b/blender_asset_tracer/pack/__init__.py @@ -175,7 +175,9 @@ class Packer: """Raises an Aborted exception when abort() was called.""" with self._abort_lock: - if not self._aborted.is_set(): + if self._file_transferer is not None and self._file_transferer.has_error: + log.error('A transfer error occurred') + elif not self._aborted.is_set(): return log.warning('Aborting') diff --git a/blender_asset_tracer/pack/filesystem.py b/blender_asset_tracer/pack/filesystem.py index 251bbd1..b059ec2 100644 --- a/blender_asset_tracer/pack/filesystem.py +++ b/blender_asset_tracer/pack/filesystem.py @@ -68,7 +68,8 @@ class FileCopier(transfer.FileTransferer): # the main thread to inspect the queue and see which files were not # copied. The one we just failed (due to this exception) should also # be reported there. - self.queue.put((src, dst, act)) + self.queue.put((src, dst, act), timeout=1.0) + self._error.set() break if files_transferred: diff --git a/blender_asset_tracer/pack/transfer.py b/blender_asset_tracer/pack/transfer.py index 5f1f9bd..35baa14 100644 --- a/blender_asset_tracer/pack/transfer.py +++ b/blender_asset_tracer/pack/transfer.py @@ -69,7 +69,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): # to finish copying a file. self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] self.done = threading.Event() - self._abort = threading.Event() + self._abort = threading.Event() # Indicates user-requested abort + self._error = threading.Event() # Indicates abort due to some error # Instantiate a dummy progress callback so that we can call it # without checking for None all the time. @@ -85,6 +86,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): """Queue a copy action from 'src' to 'dst'.""" assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' + if self._error.is_set(): + return self.queue.put((src, dst, Action.COPY)) self.total_queued_bytes += src.stat().st_size @@ -92,6 +95,8 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): """Queue a move action from 'src' to 'dst'.""" assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called' assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called' + if self._error.is_set(): + return self.queue.put((src, dst, Action.MOVE)) self.total_queued_bytes += src.stat().st_size @@ -151,7 +156,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): """Generator, yield queued items until the work is done.""" while True: - if self._abort.is_set(): + if self._abort.is_set() or self._error.is_set(): return try: @@ -189,3 +194,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta): path.unlink() except IOError as ex: log.warning('Unable to delete %s: %s', path, ex) + + @property + def has_error(self) -> bool: + return self._error.is_set()