Sybren A. Stüvel 606377180c Pack: always pack files, explode directories to a list of files
When an asset is represented as directory in Blender (for example fluid
simulation caches), that directory is traced and each file is considered
an asset.

This makes it considerably easier for Shaman clients, as they need to
compute the SHA256 checksum of each file. The logic to transform a
directory path to a list of the contained files is now in BAT itself.
2023-11-02 14:53:52 +01:00

243 lines
8.3 KiB
Python

# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2018, Blender Foundation - Sybren A. Stüvel
import abc
import enum
import logging
import pathlib
import queue
import threading
import time
import typing
from typing import Optional
from . import progress
log = logging.getLogger(__name__)
class FileTransferError(IOError):
"""Raised when one or more files could not be transferred."""
def __init__(self, message, files_remaining: typing.List[pathlib.Path]) -> None:
super().__init__(message)
self.files_remaining = files_remaining
class Action(enum.Enum):
COPY = 1
MOVE = 2
QueueItem = typing.Tuple[pathlib.Path, pathlib.PurePath, Action]
class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
"""Abstract superclass for file transfer classes.
Implement a run() function in a subclass that performs the actual file
transfer.
"""
def __init__(self) -> None:
super().__init__()
self.log = log.getChild("FileTransferer")
# For copying in a different process. By using a priority queue the files
# are automatically sorted alphabetically, which means we go through all files
# in a single directory at a time. This should be faster to copy than random
# access. The order isn't guaranteed, though, as we're not waiting around for
# all file paths to be known before copying starts.
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.queue = queue.PriorityQueue(
maxsize=100
) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event()
self._abort = threading.Event() # Indicates user-requested abort
self.__error_mutex = threading.Lock()
self.__error = threading.Event() # Indicates abort due to some error
self.__error_message = ""
# Instantiate a dummy progress callback so that we can call it
# without checking for None all the time.
self.progress_cb = progress.ThreadSafeCallback(progress.Callback())
self.total_queued_bytes = 0
self.total_transferred_bytes = 0
@abc.abstractmethod
def run(self):
"""Perform actual file transfer in a thread."""
def queue_copy(self, src: pathlib.Path, dst: pathlib.PurePath):
"""Queue a copy action from 'src' to 'dst'."""
if src.is_dir():
raise TypeError(f"only files can be copied, not directories: {src}")
assert (
not self.done.is_set()
), "Queueing not allowed after done_and_join() was called"
assert (
not self._abort.is_set()
), "Queueing not allowed after abort_and_join() was called"
if self.__error.is_set():
return
self.queue.put((src, dst, Action.COPY))
self.total_queued_bytes += src.stat().st_size
def queue_move(self, src: pathlib.Path, dst: pathlib.PurePath):
"""Queue a move action from 'src' to 'dst'."""
if src.is_dir():
raise TypeError(f"only files can be moved, not directories: {src}")
assert (
not self.done.is_set()
), "Queueing not allowed after done_and_join() was called"
assert (
not self._abort.is_set()
), "Queueing not allowed after abort_and_join() was called"
if self.__error.is_set():
return
self.queue.put((src, dst, Action.MOVE))
self.total_queued_bytes += src.stat().st_size
def report_transferred(self, bytes_transferred: int):
"""Report transfer of `block_size` bytes."""
self.total_transferred_bytes += bytes_transferred
self.progress_cb.transfer_progress(
self.total_queued_bytes, self.total_transferred_bytes
)
def done_and_join(self) -> None:
"""Indicate all files have been queued, and wait until done.
After this function has been called, the queue_xxx() methods should not
be called any more.
:raises FileTransferError: if there was an error transferring one or
more files.
"""
self.done.set()
self.join()
if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = self._files_remaining()
assert files_remaining
raise FileTransferError(
"%d files couldn't be transferred" % len(files_remaining),
files_remaining,
)
def _files_remaining(self) -> typing.List[pathlib.Path]:
"""Source files that were queued but not transferred."""
files_remaining = []
while not self.queue.empty():
src, dst, act = self.queue.get_nowait()
files_remaining.append(src)
return files_remaining
def abort(self) -> None:
"""Abort the file transfer, immediately returns."""
log.info("Aborting")
self._abort.set()
def abort_and_join(self) -> None:
"""Abort the file transfer, and wait until done."""
self.abort()
self.join()
files_remaining = self._files_remaining()
if not files_remaining:
return
log.warning(
"%d files couldn't be transferred, starting with %s",
len(files_remaining),
files_remaining[0],
)
def iter_queue(self) -> typing.Iterable[QueueItem]:
"""Generator, yield queued items until the work is done."""
while True:
if self._abort.is_set() or self.__error.is_set():
return
try:
src, dst, action = self.queue.get(timeout=0.5)
self.progress_cb.transfer_file(src, dst)
yield src, dst, action
except queue.Empty:
if self.done.is_set():
return
def join(self, timeout: Optional[float] = None) -> None:
"""Wait for the transfer to finish/stop."""
if timeout:
run_until = time.time() + timeout
else:
run_until = float("inf")
# We can't simply block the thread, we have to keep watching the
# progress queue.
while self.is_alive():
if time.time() > run_until:
self.log.warning("Timeout while waiting for transfer to finish")
return
self.progress_cb.flush(timeout=0.5)
# Since Thread.join() neither returns anything nor raises any exception
# when timing out, we don't even have to call it any more.
def delete_file(self, path: pathlib.Path):
"""Deletes a file, only logging a warning if deletion fails."""
log.debug("Deleting %s, file has been transferred", path)
try:
path.unlink()
except IOError as ex:
log.warning("Unable to delete %s: %s", path, ex)
@property
def has_error(self) -> bool:
return self.__error.is_set()
def error_set(self, message: str):
"""Indicate an error occurred, and provide a message."""
with self.__error_mutex:
# Avoid overwriting previous error messages.
if self.__error.is_set():
return
self.__error.set()
self.__error_message = message
def error_message(self) -> str:
"""Retrieve the error messsage, or an empty string if no error occurred."""
with self.__error_mutex:
if not self.__error.is_set():
return ""
return self.__error_message