Moved file copy queue to separate module.
Also transformed into a threading.Thread subclass, rather than using a function + module-global state variables.
This commit is contained in:
parent
0df94c74cd
commit
b6527ec1ee
@ -2,34 +2,16 @@ import collections
|
|||||||
import enum
|
import enum
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
import threading
|
|
||||||
import pathlib
|
import pathlib
|
||||||
import queue
|
|
||||||
import shutil
|
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from blender_asset_tracer import tracer, bpathlib, blendfile
|
from blender_asset_tracer import tracer, bpathlib, blendfile
|
||||||
from blender_asset_tracer.cli import common
|
from blender_asset_tracer.cli import common
|
||||||
from blender_asset_tracer.tracer import result
|
from blender_asset_tracer.tracer import result
|
||||||
|
from . import queued_copy
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
# For copying in a different process. By using a priority queue the files
|
|
||||||
# are automatically sorted alphabetically, which means we go through all files
|
|
||||||
# in a single directory at a time. This should be faster to copy than random
|
|
||||||
# access. The order isn't guaranteed, though, as we're not waiting around for
|
|
||||||
# all file paths to be known before copying starts.
|
|
||||||
file_copy_queue = queue.PriorityQueue()
|
|
||||||
file_copy_done = threading.Event()
|
|
||||||
|
|
||||||
|
|
||||||
class FileCopyError(IOError):
|
|
||||||
"""Raised when one or more files could not be copied."""
|
|
||||||
|
|
||||||
def __init__(self, message, files_not_copied: typing.List[pathlib.Path]):
|
|
||||||
super().__init__(message)
|
|
||||||
self.files_not_copied = files_not_copied
|
|
||||||
|
|
||||||
|
|
||||||
class PathAction(enum.Enum):
|
class PathAction(enum.Enum):
|
||||||
KEEP_PATH = 1
|
KEEP_PATH = 1
|
||||||
@ -54,7 +36,6 @@ class Packer:
|
|||||||
self.target = target
|
self.target = target
|
||||||
self.noop = noop
|
self.noop = noop
|
||||||
|
|
||||||
self._already_copied = set()
|
|
||||||
self._shorten = functools.partial(common.shorten, self.project)
|
self._shorten = functools.partial(common.shorten, self.project)
|
||||||
|
|
||||||
if noop:
|
if noop:
|
||||||
@ -64,6 +45,9 @@ class Packer:
|
|||||||
self._actions = collections.defaultdict(AssetAction)
|
self._actions = collections.defaultdict(AssetAction)
|
||||||
self._rewrites = collections.defaultdict(list)
|
self._rewrites = collections.defaultdict(list)
|
||||||
|
|
||||||
|
# Number of files we would copy, if not for --noop
|
||||||
|
self._file_count = 0
|
||||||
|
|
||||||
def strategise(self):
|
def strategise(self):
|
||||||
"""Determine what to do with the assets.
|
"""Determine what to do with the assets.
|
||||||
|
|
||||||
@ -158,29 +142,17 @@ class Packer:
|
|||||||
"""
|
"""
|
||||||
log.info('Executing %d copy actions', len(self._actions))
|
log.info('Executing %d copy actions', len(self._actions))
|
||||||
|
|
||||||
t = threading.Thread(target=copy_queued)
|
fc = queued_copy.FileCopier()
|
||||||
if not self.noop:
|
if not self.noop:
|
||||||
t.start()
|
fc.start()
|
||||||
|
|
||||||
for asset_path, action in self._actions.items():
|
for asset_path, action in self._actions.items():
|
||||||
self._copy_asset_and_deps(asset_path, action)
|
self._copy_asset_and_deps(asset_path, action, fc)
|
||||||
|
|
||||||
if self.noop:
|
if self.noop:
|
||||||
log.info('Would copy %d files to %s', len(self._already_copied), self.target)
|
log.info('Would copy %d files to %s', self._file_count, self.target)
|
||||||
return
|
return
|
||||||
|
fc.done_and_join()
|
||||||
file_copy_done.set()
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
if not file_copy_queue.empty():
|
|
||||||
# Flush the queue so that we can report which files weren't copied yet.
|
|
||||||
files_remaining = []
|
|
||||||
while not file_copy_queue.empty():
|
|
||||||
src, dst = file_copy_queue.get_nowait()
|
|
||||||
files_remaining.append(src)
|
|
||||||
assert files_remaining
|
|
||||||
raise FileCopyError("%d files couldn't be copied" % len(files_remaining),
|
|
||||||
files_remaining)
|
|
||||||
|
|
||||||
def _rewrite_paths(self):
|
def _rewrite_paths(self):
|
||||||
"""Rewrite paths to the new location of the assets."""
|
"""Rewrite paths to the new location of the assets."""
|
||||||
@ -229,12 +201,13 @@ class Packer:
|
|||||||
written = block.set(usage.path_full_field.name.name_only, relpath)
|
written = block.set(usage.path_full_field.name.name_only, relpath)
|
||||||
log.info(' - written %d bytes', written)
|
log.info(' - written %d bytes', written)
|
||||||
|
|
||||||
def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction):
|
def _copy_asset_and_deps(self, asset_path: pathlib.Path, action: AssetAction,
|
||||||
|
fc: queued_copy.FileCopier):
|
||||||
log.debug('Queueing copy of %s and dependencies', asset_path)
|
log.debug('Queueing copy of %s and dependencies', asset_path)
|
||||||
|
|
||||||
# Copy the asset itself.
|
# Copy the asset itself.
|
||||||
packed_path = self._actions[asset_path].new_path
|
packed_path = self._actions[asset_path].new_path
|
||||||
self._copy_to_target(asset_path, packed_path)
|
self._copy_to_target(asset_path, packed_path, fc)
|
||||||
|
|
||||||
# Copy its sequence dependencies.
|
# Copy its sequence dependencies.
|
||||||
for usage in action.usages:
|
for usage in action.usages:
|
||||||
@ -250,57 +223,14 @@ class Packer:
|
|||||||
packed_base_dir = first_pp.parent
|
packed_base_dir = first_pp.parent
|
||||||
for file_path in usage.files():
|
for file_path in usage.files():
|
||||||
packed_path = packed_base_dir / file_path.name
|
packed_path = packed_base_dir / file_path.name
|
||||||
self._copy_to_target(file_path, packed_path)
|
self._copy_to_target(file_path, packed_path, fc)
|
||||||
|
|
||||||
# Assumption: all data blocks using this asset use it the same way.
|
# Assumption: all data blocks using this asset use it the same way.
|
||||||
break
|
break
|
||||||
|
|
||||||
def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path):
|
def _copy_to_target(self, asset_path: pathlib.Path, target: pathlib.Path, fc):
|
||||||
if self.noop:
|
if self.noop:
|
||||||
print('%s → %s' % (asset_path, target))
|
print('%s → %s' % (asset_path, target))
|
||||||
|
self._file_count += 1
|
||||||
return
|
return
|
||||||
file_copy_queue.put((asset_path, target))
|
fc.queue(asset_path, target)
|
||||||
|
|
||||||
|
|
||||||
def copy_queued():
|
|
||||||
my_log = log.getChild('copy_queued')
|
|
||||||
files_copied = 0
|
|
||||||
files_skipped = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
src, dst = file_copy_queue.get(timeout=0.1)
|
|
||||||
except queue.Empty:
|
|
||||||
if file_copy_done.is_set():
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
if dst.exists():
|
|
||||||
st_src = src.stat()
|
|
||||||
st_dst = dst.stat()
|
|
||||||
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
|
|
||||||
my_log.info('Skipping %s; already exists', src)
|
|
||||||
files_skipped += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
my_log.info('Copying %s → %s', src, dst)
|
|
||||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
# TODO(Sybren): when we target Py 3.6+, remove the str() calls.
|
|
||||||
shutil.copy(str(src), str(dst))
|
|
||||||
files_copied += 1
|
|
||||||
except Exception:
|
|
||||||
# We have to catch exceptions in a broad way, as this is running in
|
|
||||||
# a separate thread, and exceptions won't otherwise be seen.
|
|
||||||
my_log.exception('Error copying %s to %s', src, dst)
|
|
||||||
# Put the files to copy back into the queue, and abort. This allows
|
|
||||||
# the main thread to inspect the queue and see which files were not
|
|
||||||
# copied. The one we just failed (due to this exception) should also
|
|
||||||
# be reported there.
|
|
||||||
file_copy_queue.put((src, dst))
|
|
||||||
return
|
|
||||||
|
|
||||||
if files_copied:
|
|
||||||
my_log.info('Copied %d files', files_copied)
|
|
||||||
if files_skipped:
|
|
||||||
my_log.info('Skipped %d files', files_skipped)
|
|
||||||
|
|||||||
97
blender_asset_tracer/pack/queued_copy.py
Normal file
97
blender_asset_tracer/pack/queued_copy.py
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import pathlib
|
||||||
|
import queue
|
||||||
|
import shutil
|
||||||
|
import typing
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class FileCopyError(IOError):
|
||||||
|
"""Raised when one or more files could not be copied."""
|
||||||
|
|
||||||
|
def __init__(self, message, files_not_copied: typing.List[pathlib.Path]):
|
||||||
|
super().__init__(message)
|
||||||
|
self.files_not_copied = files_not_copied
|
||||||
|
|
||||||
|
|
||||||
|
class FileCopier(threading.Thread):
|
||||||
|
"""Copies files in directory order."""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
# For copying in a different process. By using a priority queue the files
|
||||||
|
# are automatically sorted alphabetically, which means we go through all files
|
||||||
|
# in a single directory at a time. This should be faster to copy than random
|
||||||
|
# access. The order isn't guaranteed, though, as we're not waiting around for
|
||||||
|
# all file paths to be known before copying starts.
|
||||||
|
|
||||||
|
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
|
||||||
|
# is reached, the main thread will simply block while waiting for this thread
|
||||||
|
# to finish copying a file.
|
||||||
|
self.file_copy_queue = queue.PriorityQueue(maxsize=100)
|
||||||
|
self.file_copy_done = threading.Event()
|
||||||
|
|
||||||
|
def queue(self, src: pathlib.Path, dst: pathlib.Path):
|
||||||
|
"""Queue a copy action from 'src' to 'dst'."""
|
||||||
|
self.file_copy_queue.put((src, dst))
|
||||||
|
|
||||||
|
def done_and_join(self):
|
||||||
|
"""Indicate all files have been queued, and wait until done."""
|
||||||
|
|
||||||
|
self.file_copy_done.set()
|
||||||
|
self.join()
|
||||||
|
|
||||||
|
if not self.file_copy_queue.empty():
|
||||||
|
# Flush the queue so that we can report which files weren't copied yet.
|
||||||
|
files_remaining = []
|
||||||
|
while not self.file_copy_queue.empty():
|
||||||
|
src, dst = self.file_copy_queue.get_nowait()
|
||||||
|
files_remaining.append(src)
|
||||||
|
assert files_remaining
|
||||||
|
raise FileCopyError("%d files couldn't be copied" % len(files_remaining),
|
||||||
|
files_remaining)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
files_copied = 0
|
||||||
|
files_skipped = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
src, dst = self.file_copy_queue.get(timeout=0.1)
|
||||||
|
except queue.Empty:
|
||||||
|
if self.file_copy_done.is_set():
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
if dst.exists():
|
||||||
|
st_src = src.stat()
|
||||||
|
st_dst = dst.stat()
|
||||||
|
if st_dst.st_size == st_src.st_size and st_dst.st_mtime >= st_src.st_mtime:
|
||||||
|
log.info('Skipping %s; already exists', src)
|
||||||
|
files_skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.info('Copying %s → %s', src, dst)
|
||||||
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
# TODO(Sybren): when we target Py 3.6+, remove the str() calls.
|
||||||
|
shutil.copy(str(src), str(dst))
|
||||||
|
files_copied += 1
|
||||||
|
except Exception:
|
||||||
|
# We have to catch exceptions in a broad way, as this is running in
|
||||||
|
# a separate thread, and exceptions won't otherwise be seen.
|
||||||
|
log.exception('Error copying %s to %s', src, dst)
|
||||||
|
# Put the files to copy back into the queue, and abort. This allows
|
||||||
|
# the main thread to inspect the queue and see which files were not
|
||||||
|
# copied. The one we just failed (due to this exception) should also
|
||||||
|
# be reported there.
|
||||||
|
self.file_copy_queue.put((src, dst))
|
||||||
|
return
|
||||||
|
|
||||||
|
if files_copied:
|
||||||
|
log.info('Copied %d files', files_copied)
|
||||||
|
if files_skipped:
|
||||||
|
log.info('Skipped %d files', files_skipped)
|
||||||
Loading…
x
Reference in New Issue
Block a user