Sybren A. Stüvel 0778b3a6ec Defined file transferer abstract base class
This will allow for different file transfer classes uploading to different
services.
2018-03-09 13:04:14 +01:00

115 lines
4.1 KiB
Python

import enum
import logging
import threading
import pathlib
import queue
import shutil
import typing
from . import transfer
log = logging.getLogger(__name__)
class Action(enum.Enum):
COPY = 1
MOVE = 2
class FileCopier(threading.Thread, transfer.FileTransferer):
"""Copies or moves files in source directory order."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# For copying in a different process. By using a priority queue the files
# are automatically sorted alphabetically, which means we go through all files
# in a single directory at a time. This should be faster to copy than random
# access. The order isn't guaranteed, though, as we're not waiting around for
# all file paths to be known before copying starts.
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.queue = queue.PriorityQueue(maxsize=100) \
# type: queue.PriorityQueue[typing.Tuple[pathlib.Path, pathlib.Path, Action]]
self.done = threading.Event()
def queue_copy(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a copy action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.COPY))
def queue_move(self, src: pathlib.Path, dst: pathlib.Path):
"""Queue a move action from 'src' to 'dst'."""
self.queue.put((src, dst, Action.MOVE))
def done_and_join(self):
"""Indicate all files have been queued, and wait until done."""
self.done.set()
self.join()
if not self.queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = []
while not self.queue.empty():
src, dst = self.queue.get_nowait()
files_remaining.append(src)
assert files_remaining
raise transfer.FileTransferError(
"%d files couldn't be transferred" % len(files_remaining),
files_remaining)
def run(self):
files_transferred = 0
files_skipped = 0
transfer_funcs = {
Action.COPY: shutil.copy,
Action.MOVE: shutil.move,
}
while True:
try:
src, dst, act = self.queue.get(timeout=0.1)
except queue.Empty:
if self.done.is_set():
break
continue
try:
if dst.exists():
st_src = src.stat()
st_dst = dst.stat()
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
log.info('SKIP %s; already exists', src)
if act == Action.MOVE:
log.debug('Deleting %s', src)
src.unlink()
files_skipped += 1
continue
log.info('%s %s%s', act.name, src, dst)
dst.parent.mkdir(parents=True, exist_ok=True)
# TODO(Sybren): when we target Py 3.6+, remove the str() calls.
tfunc = transfer_funcs[act]
tfunc(str(src), str(dst))
files_transferred += 1
except Exception:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
log.exception('Error transferring %s to %s', src, dst)
# Put the files to copy back into the queue, and abort. This allows
# the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also
# be reported there.
self.queue.put((src, dst, act))
return
if files_transferred:
log.info('Transferred %d files', files_transferred)
if files_skipped:
log.info('Skipped %d files', files_skipped)