Moved common code to FileTransferer class

Many implementations will want to use the same queueing mechanism.
This commit is contained in:
Sybren A. Stüvel 2018-03-09 15:08:50 +01:00
parent 3ae39104a9
commit 1a37cb5ba1
2 changed files with 91 additions and 69 deletions

View File

@ -1,80 +1,38 @@
import enum
import logging import logging
import threading import threading
import pathlib
import queue
import shutil import shutil
import typing
from . import transfer from . import transfer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class Action(enum.Enum):
COPY = 1
MOVE = 2
class FileCopier(threading.Thread, transfer.FileTransferer): class FileCopier(threading.Thread, transfer.FileTransferer):
"""Copies or moves files in source directory order.""" """Copies or moves files in source directory order."""
def __init__(self, *args, **kwargs) -> None: def __init__(self):
super().__init__(*args, **kwargs) # Stupid Thread.__init__ doesn't call super().__init__(),
# so it doesn't get chained to transfer.FileTransferer.__init__().
# For copying in a different process. By using a priority queue the files # However, I want to have Thread as first subclass so that its
# are automatically sorted alphabetically, which means we go through all files # start() and join() methods Just Work™.
# in a single directory at a time. This should be faster to copy than random threading.Thread.__init__(self)
# access. The order isn't guaranteed, though, as we're not waiting around for transfer.FileTransferer.__init__(self)
# all file paths to be known before copying starts.
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) \
# type: queue.PriorityQueue[typing.Tuple[pathlib.Path, pathlib.Path, Action]]
self.done = threading.Event()
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a copy action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.COPY))
def queue_move(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a move action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.MOVE))
def done_and_join(self):
"""Indicate all files have been queued, and wait until done."""
self.done.set()
self.join()
if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = []
while not self.queue.empty():
src, dst = self.queue.get_nowait()
files_remaining.append(src)
assert files_remaining
raise transfer.FileTransferError(
"%d files couldn't be transferred" % len(files_remaining),
files_remaining)
def run(self): def run(self):
files_transferred = 0 files_transferred = 0
files_skipped = 0 files_skipped = 0
transfer_funcs = { transfer_funcs = {
Action.COPY: shutil.copy, transfer.Action.COPY: shutil.copy,
Action.MOVE: shutil.move, transfer.Action.MOVE: shutil.move,
} }
while True: while True:
try: try:
src, dst, act = self.queue.get(timeout=0.1) src, dst, act = self.pop_queued()
except queue.Empty: except self.Done:
if self.done.is_set():
break break
except self.Empty:
continue continue
try: try:
@ -83,7 +41,7 @@ class FileCopier(threading.Thread, transfer.FileTransferer):
st_dst = dst.stat() st_dst = dst.stat()
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime: if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
log.info('SKIP %s; already exists', src) log.info('SKIP %s; already exists', src)
if act == Action.MOVE: if act == transfer.Action.MOVE:
log.debug('Deleting %s', src) log.debug('Deleting %s', src)
src.unlink() src.unlink()
files_skipped += 1 files_skipped += 1

View File

@ -1,5 +1,8 @@
import abc import abc
import enum
import pathlib import pathlib
import queue
import threading
import typing import typing
@ -11,28 +14,46 @@ class FileTransferError(IOError):
self.files_remaining = files_remaining self.files_remaining = files_remaining
class Action(enum.Enum):
COPY = 1
MOVE = 2
QueueItem = typing.Tuple[pathlib.Path, pathlib.Path, Action]
class FileTransferer(metaclass=abc.ABCMeta): class FileTransferer(metaclass=abc.ABCMeta):
"""Interface for file transfer classes.""" """Interface for file transfer classes."""
@abc.abstractmethod class Empty(queue.Empty):
def start(self): """No more files to transfer, but more may be queued later."""
"""Starts the file transfer thread/process.
This could spin up a separate thread to perform the actual file class Done(queue.Empty):
transfer. After start() is called, implementations should still accept """No more files to transfer, work is done."""
calls to the queue_xxx() methods. In other words, this is not to be
used as queue-and-then-start, but as start-and-then-queue.
"""
@abc.abstractmethod def __init__(self) -> None:
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path) -> None: super().__init__()
# For copying in a different process. By using a priority queue the files
# are automatically sorted alphabetically, which means we go through all files
# in a single directory at a time. This should be faster to copy than random
# access. The order isn't guaranteed, though, as we're not waiting around for
# all file paths to be known before copying starts.
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) # type: queue.PriorityQueue[QueueItem]
self.done = threading.Event()
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a copy action from 'src' to 'dst'.""" """Queue a copy action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.COPY))
@abc.abstractmethod def queue_move(self, src: pathlib.Path, dst: pathlib.Path):
def queue_move(self, src: pathlib.Path, dst: pathlib.Path) -> None:
"""Queue a move action from 'src' to 'dst'.""" """Queue a move action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.MOVE))
@abc.abstractmethod
def done_and_join(self) -> None: def done_and_join(self) -> None:
"""Indicate all files have been queued, and wait until done. """Indicate all files have been queued, and wait until done.
@ -42,3 +63,46 @@ class FileTransferer(metaclass=abc.ABCMeta):
:raises FileTransferError: if there was an error transferring one or :raises FileTransferError: if there was an error transferring one or
more files. more files.
""" """
self.done.set()
self.join()
if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = []
while not self.queue.empty():
src, dst, act = self.queue.get_nowait()
files_remaining.append(src)
assert files_remaining
raise FileTransferError(
"%d files couldn't be transferred" % len(files_remaining),
files_remaining)
def pop_queued(self) -> typing.Optional[QueueItem]:
"""Pops an item off the queue, waiting 0.1 sec if the queue is empty.
:raises Done: when all files have been handled, and the work is done.
:raises Empty: when the queue is empty, but more files may be queued
in the future.
"""
try:
return self.queue.get(timeout=0.1)
except queue.Empty:
if self.done.is_set():
raise self.Done()
raise self.Empty()
@abc.abstractmethod
def start(self) -> None:
"""Starts the file transfer thread/process.
This could spin up a separate thread to perform the actual file
transfer. After start() is called, implementations should still accept
calls to the queue_xxx() methods. In other words, this is not to be
used as queue-and-then-start, but as start-and-then-queue.
"""
@abc.abstractmethod
def join(self, timeout=None):
"""Wait for the thread/process to stop."""