Multi-threaded compressed file transfers

For regular file transfers (so to a directory, not to a ZIP file or S3
storage), use multi-threaded transfer when compressing. Compressing is
CPU-bound, so using multiple threads speeds things up considerably
(packing a Spring lighting file went from 6min30 single-threaded to
2min13 multi-threaded on my machine).
This commit is contained in:
Sybren A. Stüvel 2018-11-27 15:50:20 +01:00
parent b6c0d01e45
commit 33512d42cf

View File

@ -18,7 +18,10 @@
#
# (c) 2018, Blender Foundation - Sybren A. Stüvel
import logging
import multiprocessing.pool
import os
import pathlib
import queue
import shutil
import typing
@ -28,60 +31,151 @@ from . import transfer
log = logging.getLogger(__name__)
class FileCopierPool(multiprocessing.pool.ThreadPool):
"""Thread pool that doesn't immediately queue all items.
The default (Thread)Pool class will always have a non-blocking
apply_async() function. This means that while we're looping over
the queue of files to copy, they are all popped off that queue
and turned into async tasks. That means that when there is an
error, the queue is empty and we don't have a reliable
"these files were not transferred" list.
This class will block when all threads are in use.
"""
def __init__(self, processes=None, initializer=None, initargs=()):
if processes is None:
processes = os.cpu_count() or 1
if processes < 1:
raise ValueError("Number of processes must be at least 1")
self._processes = processes
super().__init__(processes, initializer, initargs)
self._taskqueue = queue.Queue(maxsize=processes)
def _setup_queues(self):
self._inqueue = queue.Queue(maxsize=self._processes)
self._outqueue = queue.Queue(maxsize=self._processes)
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
class AbortTransfer(Exception):
"""Raised when an error was detected and file transfer should be aborted."""
class FileCopier(transfer.FileTransferer):
"""Copies or moves files in source directory order."""
# When we don't compress the files, the process is I/O bound,
# and trashing the storage by using multiple threads will
# only slow things down.
transfer_threads = 1 # type: typing.Optional[int]
def __init__(self):
super().__init__()
self.files_transferred = 0
self.files_skipped = 0
self.already_copied = set()
def run(self) -> None:
# (is_dir, action)
transfer_funcs = {
self.transfer_funcs = {
(False, transfer.Action.COPY): self.copyfile,
(True, transfer.Action.COPY): self.copytree,
(False, transfer.Action.MOVE): self.move,
(True, transfer.Action.MOVE): self.move,
}
def run(self) -> None:
pool = multiprocessing.pool.ThreadPool(processes=self.transfer_threads)
for src, dst, act in self.iter_queue():
try:
st_src = src.stat() # must exist, or it wouldn't be queued.
if dst.exists():
st_dst = dst.stat()
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
log.info('SKIP %s; already exists', src)
if act == transfer.Action.MOVE:
log.debug('Deleting %s', src)
src.unlink()
self.files_skipped += 1
continue
if self._error.is_set() or self._abort.is_set():
raise AbortTransfer()
log.info('%s %s%s', act.name, src, dst)
if self._skip_file(src, dst, act):
continue
# We want to do this in this thread, as it's not thread safe itself.
dst.parent.mkdir(parents=True, exist_ok=True)
tfunc = transfer_funcs[src.is_dir(), act]
tfunc(src, dst) # type: ignore
except Exception:
pool.apply_async(self._thread, (src, dst, act))
except AbortTransfer:
# either self._error or self._abort is already set. We just have to
# let the system know we didn't handle those files yet.
self.queue.put((src, dst, act), timeout=1.0)
except Exception as ex:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
log.exception('Error transferring %s to %s', src, dst)
if self._abort.is_set():
log.debug('Error transferring %s to %s: %s', src, dst, ex)
else:
log.exception('Error transferring %s to %s', src, dst)
self._error.set()
# Put the files to copy back into the queue, and abort. This allows
# the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also
# be reported there.
self.queue.put((src, dst, act), timeout=1.0)
self._error.set()
break
log.debug('All transfer threads queued')
pool.close()
log.debug('Waiting for transfer threads to finish')
pool.join()
log.debug('All transfer threads finished')
if self.files_transferred:
log.info('Transferred %d files', self.files_transferred)
if self.files_skipped:
log.info('Skipped %d files', self.files_skipped)
def _thread(self, src: pathlib.Path, dst: pathlib.Path, act: transfer.Action):
try:
tfunc = self.transfer_funcs[src.is_dir(), act]
if self._error.is_set() or self._abort.is_set():
raise AbortTransfer()
log.info('%s %s%s', act.name, src, dst)
tfunc(src, dst)
except AbortTransfer:
# either self._error or self._abort is already set. We just have to
# let the system know we didn't handle those files yet.
self.queue.put((src, dst, act), timeout=1.0)
except Exception as ex:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
if self._abort.is_set():
log.debug('Error transferring %s to %s: %s', src, dst, ex)
else:
log.exception('Error transferring %s to %s', src, dst)
self._error.set()
# Put the files to copy back into the queue, and abort. This allows
# the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also
# be reported there.
self.queue.put((src, dst, act), timeout=1.0)
def _skip_file(self, src: pathlib.Path, dst: pathlib.Path, act: transfer.Action) -> bool:
"""Skip this file (return True) or not (return False)."""
st_src = src.stat() # must exist, or it wouldn't be queued.
if not dst.exists():
return False
st_dst = dst.stat()
if st_dst.st_size != st_src.st_size or st_dst.st_mtime < st_src.st_mtime:
return False
log.info('SKIP %s; already exists', src)
if act == transfer.Action.MOVE:
log.debug('Deleting %s', src)
src.unlink()
self.files_skipped += 1
return True
def _move(self, srcpath: pathlib.Path, dstpath: pathlib.Path):
"""Low-level file move"""
shutil.move(str(srcpath), str(dstpath))
@ -140,9 +234,15 @@ class FileCopier(transfer.FileTransferer):
log.debug('SKIP %s; already copied', src)
return
if self._error.is_set() or self._abort.is_set():
raise AbortTransfer()
dst.mkdir(parents=True, exist_ok=True)
errors = [] # type: typing.List[typing.Tuple[pathlib.Path, pathlib.Path, str]]
for srcpath in src.iterdir():
if self._error.is_set() or self._abort.is_set():
raise AbortTransfer()
dstpath = dst / srcpath.name
try:
if srcpath.is_symlink():
@ -188,6 +288,11 @@ class FileCopier(transfer.FileTransferer):
class CompressedFileCopier(FileCopier):
# When we compress the files on the fly, the process is CPU-bound
# so we benefit greatly by multi-threading (packing a Spring scene
# lighting file took 6m30s single-threaded and 2min13 multi-threaded.
transfer_threads = None # type: typing.Optional[int]
def _move(self, srcpath: pathlib.Path, dstpath: pathlib.Path):
compressor.move(srcpath, dstpath)