File I/O improvements

- Tracer now iterates over blocks in disk order.
- Packer copies files per directory, in a separate thread.
- Packer only copies files if they don't exist yet.
- Packer also copies file permissions.
This commit is contained in:
Sybren A. Stüvel 2018-03-08 12:31:37 +01:00
parent b5418e1f5f
commit 08f538d8c8
3 changed files with 96 additions and 14 deletions

View File

@ -33,7 +33,13 @@ def cli_pack(args):
bpath, ppath, tpath = paths_from_cli(args) bpath, ppath, tpath = paths_from_cli(args)
packer = pack.Packer(bpath, ppath, tpath, args.noop) packer = pack.Packer(bpath, ppath, tpath, args.noop)
packer.strategise() packer.strategise()
try:
packer.execute() packer.execute()
except pack.FileCopyError as ex:
log.error("%d files couldn't be copied, starting with %s",
len(ex.files_not_copied), ex.files_not_copied[0])
raise SystemExit(1)
def paths_from_cli(args) -> (pathlib.Path, pathlib.Path, pathlib.Path): def paths_from_cli(args) -> (pathlib.Path, pathlib.Path, pathlib.Path):

View File

@ -2,7 +2,9 @@ import collections
import enum import enum
import functools import functools
import logging import logging
import threading
import pathlib import pathlib
import queue
import shutil import shutil
import typing import typing
@ -12,6 +14,22 @@ from blender_asset_tracer.tracer import result
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# For copying in a different process. By using a priority queue the files
# are automatically sorted alphabetically, which means we go through all files
# in a single directory at a time. This should be faster to copy than random
# access. The order isn't guaranteed, though, as we're not waiting around for
# all file paths to be known before copying starts.
file_copy_queue = queue.PriorityQueue()
file_copy_done = threading.Event()
class FileCopyError(IOError):
"""Raised when one or more files could not be copied."""
def __init__(self, message, files_not_copied: typing.List[pathlib.Path]):
super().__init__(message)
self.files_not_copied = files_not_copied
class PathAction(enum.Enum): class PathAction(enum.Enum):
KEEP_PATH = 1 KEEP_PATH = 1
@ -46,8 +64,6 @@ class Packer:
self._actions = collections.defaultdict(AssetAction) self._actions = collections.defaultdict(AssetAction)
self._rewrites = collections.defaultdict(list) self._rewrites = collections.defaultdict(list)
self._copy_cache_miss = self._copy_cache_hit = 0
def strategise(self): def strategise(self):
"""Determine what to do with the assets. """Determine what to do with the assets.
@ -134,7 +150,6 @@ class Packer:
self._copy_files_to_target() self._copy_files_to_target()
if not self.noop: if not self.noop:
self._rewrite_paths() self._rewrite_paths()
log.info('Copy cache: %d hits / %d misses', self._copy_cache_miss, self._copy_cache_hit)
def _copy_files_to_target(self): def _copy_files_to_target(self):
"""Copy all assets to the target directoy. """Copy all assets to the target directoy.
@ -142,14 +157,30 @@ class Packer:
This creates the BAT Pack but does not yet do any path rewriting. This creates the BAT Pack but does not yet do any path rewriting.
""" """
log.info('Executing %d copy actions', len(self._actions)) log.info('Executing %d copy actions', len(self._actions))
t = threading.Thread(target=copy_queued)
if not self.noop:
t.start()
for asset_path, action in self._actions.items(): for asset_path, action in self._actions.items():
self._copy_asset_and_deps(asset_path, action) self._copy_asset_and_deps(asset_path, action)
if self.noop: if self.noop:
msg = 'Would copy' log.info('Would copy %d files to %s', len(self._already_copied), self.target)
else: return
msg = 'Copied'
log.info('%s %d files to %s', msg, len(self._already_copied), self.target) file_copy_done.set()
t.join()
if not file_copy_queue.empty():
# Flush the queue so that we can report which files weren't copied yet.
files_remaining = []
while not file_copy_queue.empty():
src, dst = file_copy_queue.get_nowait()
files_remaining.append(src)
assert files_remaining
raise FileCopyError("%d files couldn't be copied" % len(files_remaining),
files_remaining)
def _rewrite_paths(self): def _rewrite_paths(self):
"""Rewrite paths to the new location of the assets.""" """Rewrite paths to the new location of the assets."""
@ -199,7 +230,7 @@ class Packer:
log.info(' - written %d bytes', written) log.info(' - written %d bytes', written)
def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction): def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction):
log.info('Copying %s and dependencies', asset_path) log.debug('Queueing copy of %s and dependencies', asset_path)
# Copy the asset itself. # Copy the asset itself.
packed_path = self._actions[asset_path].new_path packed_path = self._actions[asset_path].new_path
@ -225,10 +256,51 @@ class Packer:
break break
def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path): def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path):
print('%s%s' % (asset_path, target))
if self.noop: if self.noop:
print('%s%s' % (asset_path, target))
return
file_copy_queue.put((asset_path, target))
def copy_queued():
my_log = log.getChild('copy_queued')
files_copied = 0
files_skipped = 0
while True:
try:
src, dst = file_copy_queue.get(timeout=0.1)
except queue.Empty:
if file_copy_done.is_set():
break
continue
try:
if dst.exists():
st_src = src.stat()
st_dst = dst.stat()
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
my_log.info('Skipping %s; already exists', src)
files_skipped += 1
continue
my_log.info('Copying %s%s', src, dst)
dst.parent.mkdir(parents=True, exist_ok=True)
# TODO(Sybren): when we target Py 3.6+, remove the str() calls.
shutil.copy(str(src), str(dst))
files_copied += 1
except Exception:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
my_log.exception('Error copying %s to %s', src, dst)
# Put the files to copy back into the queue, and abort. This allows
# the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also
# be reported there.
file_copy_queue.put((src, dst))
return return
target.parent.mkdir(parents=True, exist_ok=True) if files_copied:
# TODO(Sybren): when we target Py 3.6+, remove the str() calls. my_log.info('Copied %d files', files_copied)
shutil.copyfile(str(asset_path), str(target)) if files_skipped:
my_log.info('Skipped %d files', files_skipped)

View File

@ -24,7 +24,11 @@ def deps(bfilepath: pathlib.Path) -> typing.Iterator[result.BlockUsage]:
""" """
bfile = blendfile.open_cached(bfilepath) bfile = blendfile.open_cached(bfilepath)
for block in asset_holding_blocks(file2blocks.iter_blocks(bfile)):
# Sort the asset-holding blocks so that we can iterate over them
# in disk order, which is slightly faster than random order.
ahb = asset_holding_blocks(file2blocks.iter_blocks(bfile))
for block in sorted(ahb):
yield from blocks2assets.iter_assets(block) yield from blocks2assets.iter_assets(block)