diff --git a/blender_asset_tracer/pack/queued_copy.py b/blender_asset_tracer/pack/queued_copy.py index 3863ac4..45428d5 100644 --- a/blender_asset_tracer/pack/queued_copy.py +++ b/blender_asset_tracer/pack/queued_copy.py @@ -1,80 +1,38 @@ -import enum import logging import threading -import pathlib -import queue import shutil -import typing from . import transfer log = logging.getLogger(__name__) -class Action(enum.Enum): - COPY = 1 - MOVE = 2 - - class FileCopier(threading.Thread, transfer.FileTransferer): """Copies or moves files in source directory order.""" - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - - # For copying in a different process. By using a priority queue the files - # are automatically sorted alphabetically, which means we go through all files - # in a single directory at a time. This should be faster to copy than random - # access. The order isn't guaranteed, though, as we're not waiting around for - # all file paths to be known before copying starts. - - # maxsize=100 is just a guess as to a reasonable upper limit. When this limit - # is reached, the main thread will simply block while waiting for this thread - # to finish copying a file. - self.queue = queue.PriorityQueue(maxsize=100) \ - # type: queue.PriorityQueue[typing.Tuple[pathlib.Path, pathlib.Path, Action]] - self.done = threading.Event() - - def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): - """Queue a copy action from 'src' to 'dst'.""" - self.queue.put((src, dst, Action.COPY)) - - def queue_move(self, src: pathlib.Path, dst: pathlib.Path): - """Queue a move action from 'src' to 'dst'.""" - self.queue.put((src, dst, Action.MOVE)) - - def done_and_join(self): - """Indicate all files have been queued, and wait until done.""" - - self.done.set() - self.join() - - if not self.queue.empty(): - # Flush the queue so that we can report which files weren't copied yet. - files_remaining = [] - while not self.queue.empty(): - src, dst = self.queue.get_nowait() - files_remaining.append(src) - assert files_remaining - raise transfer.FileTransferError( - "%d files couldn't be transferred" % len(files_remaining), - files_remaining) + def __init__(self): + # Stupid Thread.__init__ doesn't call super().__init__(), + # so it doesn't get chained to transfer.FileTransferer.__init__(). + # However, I want to have Thread as first subclass so that its + # start() and join() methods Just Work™. + threading.Thread.__init__(self) + transfer.FileTransferer.__init__(self) def run(self): files_transferred = 0 files_skipped = 0 transfer_funcs = { - Action.COPY: shutil.copy, - Action.MOVE: shutil.move, + transfer.Action.COPY: shutil.copy, + transfer.Action.MOVE: shutil.move, } while True: try: - src, dst, act = self.queue.get(timeout=0.1) - except queue.Empty: - if self.done.is_set(): - break + src, dst, act = self.pop_queued() + except self.Done: + break + except self.Empty: continue try: @@ -83,7 +41,7 @@ class FileCopier(threading.Thread, transfer.FileTransferer): st_dst = dst.stat() if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime: log.info('SKIP %s; already exists', src) - if act == Action.MOVE: + if act == transfer.Action.MOVE: log.debug('Deleting %s', src) src.unlink() files_skipped += 1 diff --git a/blender_asset_tracer/pack/transfer.py b/blender_asset_tracer/pack/transfer.py index 11b7816..38a05bf 100644 --- a/blender_asset_tracer/pack/transfer.py +++ b/blender_asset_tracer/pack/transfer.py @@ -1,5 +1,8 @@ import abc +import enum import pathlib +import queue +import threading import typing @@ -11,28 +14,46 @@ class FileTransferError(IOError): self.files_remaining = files_remaining +class Action(enum.Enum): + COPY = 1 + MOVE = 2 + + +QueueItem = typing.Tuple[pathlib.Path, pathlib.Path, Action] + + class FileTransferer(metaclass=abc.ABCMeta): """Interface for file transfer classes.""" - @abc.abstractmethod - def start(self): - """Starts the file transfer thread/process. + class Empty(queue.Empty): + """No more files to transfer, but more may be queued later.""" - This could spin up a separate thread to perform the actual file - transfer. After start() is called, implementations should still accept - calls to the queue_xxx() methods. In other words, this is not to be - used as queue-and-then-start, but as start-and-then-queue. - """ + class Done(queue.Empty): + """No more files to transfer, work is done.""" - @abc.abstractmethod - def queue_copy(self, src: pathlib.Path, dst: pathlib.Path) -> None: + def __init__(self) -> None: + super().__init__() + + # For copying in a different process. By using a priority queue the files + # are automatically sorted alphabetically, which means we go through all files + # in a single directory at a time. This should be faster to copy than random + # access. The order isn't guaranteed, though, as we're not waiting around for + # all file paths to be known before copying starts. + + # maxsize=100 is just a guess as to a reasonable upper limit. When this limit + # is reached, the main thread will simply block while waiting for this thread + # to finish copying a file. + self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem] + self.done = threading.Event() + + def queue_copy(self, src: pathlib.Path, dst: pathlib.Path): """Queue a copy action from 'src' to 'dst'.""" + self.queue.put((src, dst, Action.COPY)) - @abc.abstractmethod - def queue_move(self, src: pathlib.Path, dst: pathlib.Path) -> None: + def queue_move(self, src: pathlib.Path, dst: pathlib.Path): """Queue a move action from 'src' to 'dst'.""" + self.queue.put((src, dst, Action.MOVE)) - @abc.abstractmethod def done_and_join(self) -> None: """Indicate all files have been queued, and wait until done. @@ -42,3 +63,46 @@ class FileTransferer(metaclass=abc.ABCMeta): :raises FileTransferError: if there was an error transferring one or more files. """ + + self.done.set() + self.join() + + if not self.queue.empty(): + # Flush the queue so that we can report which files weren't copied yet. + files_remaining = [] + while not self.queue.empty(): + src, dst, act = self.queue.get_nowait() + files_remaining.append(src) + assert files_remaining + raise FileTransferError( + "%d files couldn't be transferred" % len(files_remaining), + files_remaining) + + def pop_queued(self) -> typing.Optional[QueueItem]: + """Pops an item off the queue, waiting 0.1 sec if the queue is empty. + + :raises Done: when all files have been handled, and the work is done. + :raises Empty: when the queue is empty, but more files may be queued + in the future. + """ + + try: + return self.queue.get(timeout=0.1) + except queue.Empty: + if self.done.is_set(): + raise self.Done() + raise self.Empty() + + @abc.abstractmethod + def start(self) -> None: + """Starts the file transfer thread/process. + + This could spin up a separate thread to perform the actual file + transfer. After start() is called, implementations should still accept + calls to the queue_xxx() methods. In other words, this is not to be + used as queue-and-then-start, but as start-and-then-queue. + """ + + @abc.abstractmethod + def join(self, timeout=None): + """Wait for the thread/process to stop."""