Include error message with error status
This commit is contained in:
parent
03fb4da583
commit
add5cf930b
@ -192,15 +192,17 @@ class Packer:
|
|||||||
"""Raises an Aborted exception when abort() was called."""
|
"""Raises an Aborted exception when abort() was called."""
|
||||||
|
|
||||||
with self._abort_lock:
|
with self._abort_lock:
|
||||||
|
reason = ''
|
||||||
if self._file_transferer is not None and self._file_transferer.has_error:
|
if self._file_transferer is not None and self._file_transferer.has_error:
|
||||||
log.error('A transfer error occurred')
|
log.error('A transfer error occurred')
|
||||||
|
reason = self._file_transferer.error_message()
|
||||||
elif not self._aborted.is_set():
|
elif not self._aborted.is_set():
|
||||||
return
|
return
|
||||||
|
|
||||||
log.warning('Aborting')
|
log.warning('Aborting')
|
||||||
self._tscb.flush()
|
self._tscb.flush()
|
||||||
self._progress_cb.pack_aborted()
|
self._progress_cb.pack_aborted()
|
||||||
raise Aborted()
|
raise Aborted(reason)
|
||||||
|
|
||||||
def exclude(self, *globs: str):
|
def exclude(self, *globs: str):
|
||||||
"""Register glob-compatible patterns of files that should be ignored.
|
"""Register glob-compatible patterns of files that should be ignored.
|
||||||
|
|||||||
@ -58,13 +58,14 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
|
|
||||||
pool = multiprocessing.pool.ThreadPool(processes=self.transfer_threads)
|
pool = multiprocessing.pool.ThreadPool(processes=self.transfer_threads)
|
||||||
|
dst = pathlib.Path()
|
||||||
for src, pure_dst, act in self.iter_queue():
|
for src, pure_dst, act in self.iter_queue():
|
||||||
try:
|
try:
|
||||||
if self._error.is_set() or self._abort.is_set():
|
dst = pathlib.Path(pure_dst)
|
||||||
|
|
||||||
|
if self.has_error or self._abort.is_set():
|
||||||
raise AbortTransfer()
|
raise AbortTransfer()
|
||||||
|
|
||||||
dst = pathlib.Path(pure_dst)
|
|
||||||
if self._skip_file(src, dst, act):
|
if self._skip_file(src, dst, act):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -82,8 +83,9 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
if self._abort.is_set():
|
if self._abort.is_set():
|
||||||
log.debug('Error transferring %s to %s: %s', src, dst, ex)
|
log.debug('Error transferring %s to %s: %s', src, dst, ex)
|
||||||
else:
|
else:
|
||||||
log.exception('Error transferring %s to %s', src, dst)
|
msg = 'Error transferring %s to %s' % (src, dst)
|
||||||
self._error.set()
|
log.exception(msg)
|
||||||
|
self.error_set(msg)
|
||||||
# Put the files to copy back into the queue, and abort. This allows
|
# Put the files to copy back into the queue, and abort. This allows
|
||||||
# the main thread to inspect the queue and see which files were not
|
# the main thread to inspect the queue and see which files were not
|
||||||
# copied. The one we just failed (due to this exception) should also
|
# copied. The one we just failed (due to this exception) should also
|
||||||
@ -106,7 +108,7 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
try:
|
try:
|
||||||
tfunc = self.transfer_funcs[src.is_dir(), act]
|
tfunc = self.transfer_funcs[src.is_dir(), act]
|
||||||
|
|
||||||
if self._error.is_set() or self._abort.is_set():
|
if self.has_error or self._abort.is_set():
|
||||||
raise AbortTransfer()
|
raise AbortTransfer()
|
||||||
|
|
||||||
log.info('%s %s -> %s', act.name, src, dst)
|
log.info('%s %s -> %s', act.name, src, dst)
|
||||||
@ -121,8 +123,9 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
if self._abort.is_set():
|
if self._abort.is_set():
|
||||||
log.debug('Error transferring %s to %s: %s', src, dst, ex)
|
log.debug('Error transferring %s to %s: %s', src, dst, ex)
|
||||||
else:
|
else:
|
||||||
log.exception('Error transferring %s to %s', src, dst)
|
msg = 'Error transferring %s to %s' % (src, dst)
|
||||||
self._error.set()
|
log.exception(msg)
|
||||||
|
self.error_set(msg)
|
||||||
# Put the files to copy back into the queue, and abort. This allows
|
# Put the files to copy back into the queue, and abort. This allows
|
||||||
# the main thread to inspect the queue and see which files were not
|
# the main thread to inspect the queue and see which files were not
|
||||||
# copied. The one we just failed (due to this exception) should also
|
# copied. The one we just failed (due to this exception) should also
|
||||||
@ -164,7 +167,7 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
def copyfile(self, srcpath: pathlib.Path, dstpath: pathlib.Path):
|
def copyfile(self, srcpath: pathlib.Path, dstpath: pathlib.Path):
|
||||||
"""Copy a file, skipping when it already exists."""
|
"""Copy a file, skipping when it already exists."""
|
||||||
|
|
||||||
if self._abort.is_set() or self._error.is_set():
|
if self._abort.is_set() or self.has_error:
|
||||||
return
|
return
|
||||||
|
|
||||||
if (srcpath, dstpath) in self.already_copied:
|
if (srcpath, dstpath) in self.already_copied:
|
||||||
@ -204,13 +207,13 @@ class FileCopier(transfer.FileTransferer):
|
|||||||
log.debug('SKIP %s; already copied', src)
|
log.debug('SKIP %s; already copied', src)
|
||||||
return
|
return
|
||||||
|
|
||||||
if self._error.is_set() or self._abort.is_set():
|
if self.has_error or self._abort.is_set():
|
||||||
raise AbortTransfer()
|
raise AbortTransfer()
|
||||||
|
|
||||||
dst.mkdir(parents=True, exist_ok=True)
|
dst.mkdir(parents=True, exist_ok=True)
|
||||||
errors = [] # type: typing.List[typing.Tuple[pathlib.Path, pathlib.Path, str]]
|
errors = [] # type: typing.List[typing.Tuple[pathlib.Path, pathlib.Path, str]]
|
||||||
for srcpath in src.iterdir():
|
for srcpath in src.iterdir():
|
||||||
if self._error.is_set() or self._abort.is_set():
|
if self.has_error or self._abort.is_set():
|
||||||
raise AbortTransfer()
|
raise AbortTransfer()
|
||||||
|
|
||||||
dstpath = dst / srcpath.name
|
dstpath = dst / srcpath.name
|
||||||
|
|||||||
@ -70,7 +70,10 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
|
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
|
||||||
self.done = threading.Event()
|
self.done = threading.Event()
|
||||||
self._abort = threading.Event() # Indicates user-requested abort
|
self._abort = threading.Event() # Indicates user-requested abort
|
||||||
self._error = threading.Event() # Indicates abort due to some error
|
|
||||||
|
self.__error_mutex = threading.Lock()
|
||||||
|
self.__error = threading.Event() # Indicates abort due to some error
|
||||||
|
self.__error_message = ''
|
||||||
|
|
||||||
# Instantiate a dummy progress callback so that we can call it
|
# Instantiate a dummy progress callback so that we can call it
|
||||||
# without checking for None all the time.
|
# without checking for None all the time.
|
||||||
@ -86,7 +89,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
"""Queue a copy action from 'src' to 'dst'."""
|
"""Queue a copy action from 'src' to 'dst'."""
|
||||||
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
|
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
|
||||||
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
|
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
|
||||||
if self._error.is_set():
|
if self.__error.is_set():
|
||||||
return
|
return
|
||||||
self.queue.put((src, dst, Action.COPY))
|
self.queue.put((src, dst, Action.COPY))
|
||||||
self.total_queued_bytes += src.stat().st_size
|
self.total_queued_bytes += src.stat().st_size
|
||||||
@ -95,7 +98,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
"""Queue a move action from 'src' to 'dst'."""
|
"""Queue a move action from 'src' to 'dst'."""
|
||||||
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
|
assert not self.done.is_set(), 'Queueing not allowed after done_and_join() was called'
|
||||||
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
|
assert not self._abort.is_set(), 'Queueing not allowed after abort_and_join() was called'
|
||||||
if self._error.is_set():
|
if self.__error.is_set():
|
||||||
return
|
return
|
||||||
self.queue.put((src, dst, Action.MOVE))
|
self.queue.put((src, dst, Action.MOVE))
|
||||||
self.total_queued_bytes += src.stat().st_size
|
self.total_queued_bytes += src.stat().st_size
|
||||||
@ -156,7 +159,7 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
"""Generator, yield queued items until the work is done."""
|
"""Generator, yield queued items until the work is done."""
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if self._abort.is_set() or self._error.is_set():
|
if self._abort.is_set() or self.__error.is_set():
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -197,4 +200,22 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def has_error(self) -> bool:
|
def has_error(self) -> bool:
|
||||||
return self._error.is_set()
|
return self.__error.is_set()
|
||||||
|
|
||||||
|
def error_set(self, message: str):
|
||||||
|
"""Indicate an error occurred, and provide a message."""
|
||||||
|
|
||||||
|
with self.__error_mutex:
|
||||||
|
# Avoid overwriting previous error messages.
|
||||||
|
if self.__error.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
|
self.__error.set()
|
||||||
|
self.__error_message = message
|
||||||
|
|
||||||
|
def error_message(self) -> str:
|
||||||
|
"""Retrieve the error messsage, or an empty string if no error occurred."""
|
||||||
|
with self.__error_mutex:
|
||||||
|
if not self.__error.is_set():
|
||||||
|
return ''
|
||||||
|
return self.__error_message
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user