blender_asset_tracer/pack/transfer.py

238 lines
8.0 KiB
Python
Executable File

# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2018, Blender Foundation - Sybren A. Stüvel
import abc
import enum
import logging
import pathlib
import queue
import threading
import time
import typing
from . import progress
log = logging.getLogger(__name__)
class FileTransferError(IOError):
"""Raised when one or more files could not be transferred."""
def __init__(self, message, files_remaining: typing.List[pathlib.Path]) -> None:
super().__init__(message)
self.files_remaining = files_remaining
class Action(enum.Enum):
COPY = 1
MOVE = 2
QueueItem = typing.Tuple[pathlib.Path, pathlib.PurePath, Action]
class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Abstract superclass for file transfer classes.
Implement a run() function in a subclass that performs the actual file
transfer.
"""
def __init__(self) -> None:
super().__init__()
self.log = log.getChild("FileTransferer")
# For copying in a different process. By using a priority queue the files
# are automatically sorted alphabetically, which means we go through all files
# in a single directory at a time. This should be faster to copy than random
# access. The order isn't guaranteed, though, as we're not waiting around for
# all file paths to be known before copying starts.
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.queue = queue.PriorityQueue(
maxsize=100
) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event()
self._abort = threading.Event() # Indicates user-requested abort
self.__error_mutex = threading.Lock()
self.__error = threading.Event() # Indicates abort due to some error
self.__error_message = ""
# Instantiate a dummy progress callback so that we can call it
# without checking for None all the time.
self.progress_cb = progress.ThreadSafeCallback(progress.Callback())
self.total_queued_bytes = 0
self.total_transferred_bytes = 0
@abc.abstractmethod
def run(self):
"""Perform actual file transfer in a thread."""
def queue_copy(self, src: pathlib.Path, dst: pathlib.PurePath):
"""Queue a copy action from 'src' to 'dst'."""
assert (
not self.done.is_set()
), "Queueing not allowed after done_and_join() was called"
assert (
not self._abort.is_set()
), "Queueing not allowed after abort_and_join() was called"
if self.__error.is_set():
return
self.queue.put((src, dst, Action.COPY))
self.total_queued_bytes += src.stat().st_size
def queue_move(self, src: pathlib.Path, dst: pathlib.PurePath):
"""Queue a move action from 'src' to 'dst'."""
assert (
not self.done.is_set()
), "Queueing not allowed after done_and_join() was called"
assert (
not self._abort.is_set()
), "Queueing not allowed after abort_and_join() was called"
if self.__error.is_set():
return
self.queue.put((src, dst, Action.MOVE))
self.total_queued_bytes += src.stat().st_size
def report_transferred(self, bytes_transferred: int):
"""Report transfer of `block_size` bytes."""
self.total_transferred_bytes += bytes_transferred
self.progress_cb.transfer_progress(
self.total_queued_bytes, self.total_transferred_bytes
)
def done_and_join(self) -> None:
"""Indicate all files have been queued, and wait until done.
After this function has been called, the queue_xxx() methods should not
be called any more.
:raises FileTransferError: if there was an error transferring one or
more files.
"""
self.done.set()
self.join()
if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = self._files_remaining()
assert files_remaining
raise FileTransferError(
"%d files couldn't be transferred" % len(files_remaining),
files_remaining,
)
def _files_remaining(self) -> typing.List[pathlib.Path]:
"""Source files that were queued but not transferred."""
files_remaining = []
while not self.queue.empty():
src, dst, act = self.queue.get_nowait()
files_remaining.append(src)
return files_remaining
def abort(self) -> None:
"""Abort the file transfer, immediately returns."""
log.info("Aborting")
self._abort.set()
def abort_and_join(self) -> None:
"""Abort the file transfer, and wait until done."""
self.abort()
self.join()
files_remaining = self._files_remaining()
if not files_remaining:
return
log.warning(
"%d files couldn't be transferred, starting with %s",
len(files_remaining),
files_remaining[0],
)
def iter_queue(self) -> typing.Iterable[QueueItem]:
"""Generator, yield queued items until the work is done."""
while True:
if self._abort.is_set() or self.__error.is_set():
return
try:
src, dst, action = self.queue.get(timeout=0.5)
self.progress_cb.transfer_file(src, dst)
yield src, dst, action
except queue.Empty:
if self.done.is_set():
return
def join(self, timeout: float = None) -> None:
"""Wait for the transfer to finish/stop."""
if timeout:
run_until = time.time() + timeout
else:
run_until = float("inf")
# We can't simply block the thread, we have to keep watching the
# progress queue.
while self.is_alive():
if time.time() > run_until:
self.log.warning("Timeout while waiting for transfer to finish")
return
self.progress_cb.flush(timeout=0.5)
# Since Thread.join() neither returns anything nor raises any exception
# when timing out, we don't even have to call it any more.
def delete_file(self, path: pathlib.Path):
"""Deletes a file, only logging a warning if deletion fails."""
log.debug("Deleting %s, file has been transferred", path)
try:
path.unlink()
except IOError as ex:
log.warning("Unable to delete %s: %s", path, ex)
@property
def has_error(self) -> bool:
return self.__error.is_set()
def error_set(self, message: str):
"""Indicate an error occurred, and provide a message."""
with self.__error_mutex:
# Avoid overwriting previous error messages.
if self.__error.is_set():
return
self.__error.set()
self.__error_message = message
def error_message(self) -> str:
"""Retrieve the error messsage, or an empty string if no error occurred."""
with self.__error_mutex:
if not self.__error.is_set():
return ""
return self.__error_message