blender_asset_tracer/pack/shaman/transfer.py

360 lines
15 KiB
Python

# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2019, Blender Foundation - Sybren A. Stüvel
import collections
import logging
import pathlib
import random
import typing
import requests
import blender_asset_tracer.pack.transfer as bat_transfer
from blender_asset_tracer import bpathlib
MAX_DEFERRED_PATHS = 8
MAX_FAILED_PATHS = 8
response_file_unknown = "file-unknown"
response_already_uploading = "already-uploading"
class FileInfo:
def __init__(self, checksum: str, filesize: int, abspath: pathlib.Path):
self.checksum = checksum
self.filesize = filesize
self.abspath = abspath
class ShamanTransferrer(bat_transfer.FileTransferer):
"""Sends files to a Shaman server."""
class AbortUpload(Exception):
"""Raised from the upload callback to abort an upload."""
def __init__(self, auth_token: str, project_root: pathlib.Path,
shaman_endpoint: str, checkout_id: str) -> None:
from . import client
super().__init__()
self.client = client.ShamanClient(auth_token, shaman_endpoint)
self.project_root = project_root
self.checkout_id = checkout_id
self.log = logging.getLogger(__name__)
self._file_info = {} # type: typing.Dict[str, FileInfo]
# When the Shaman creates a checkout, it'll return the location of that
# checkout. This can then be combined with the project-relative path
# of the to-be-rendered blend file (e.g. the one 'bat pack' was pointed
# at).
self._checkout_location = ''
self.uploaded_files = 0
self.uploaded_bytes = 0
# noinspection PyBroadException
def run(self) -> None:
try:
self.uploaded_files = 0
self.uploaded_bytes = 0
# Construct the Shaman Checkout Definition file.
# This blocks until we know the entire list of files to transfer.
definition_file, allowed_relpaths, delete_when_done = self._create_checkout_definition()
if not definition_file:
# An error has already been logged.
return
self.log.info('Created checkout definition file of %d KiB',
len(definition_file) // 1024)
self.log.info('Feeding %d files to the Shaman', len(self._file_info))
if self.log.isEnabledFor(logging.INFO):
for path in self._file_info:
self.log.info(' - %s', path)
# Try to upload all the files.
failed_paths = set() # type: typing.Set[str]
max_tries = 50
for try_index in range(max_tries):
# Send the file to the Shaman and see what we still need to send there.
to_upload = self._send_checkout_def_to_shaman(definition_file, allowed_relpaths)
if to_upload is None:
# An error has already been logged.
return
if not to_upload:
break
# Send the files that still need to be sent.
self.log.info('Upload attempt %d', try_index + 1)
failed_paths = self._upload_files(to_upload)
if not failed_paths:
break
# Having failed paths at this point is expected when multiple
# clients are sending the same files. Instead of retrying on a
# file-by-file basis, we just re-send the checkout definition
# file to the Shaman and obtain a new list of files to upload.
if failed_paths:
self.log.error('Aborting upload due to too many failures')
self.error_set('Giving up after %d attempts to upload the files' % max_tries)
return
self.log.info('All files uploaded succesfully')
self._request_checkout(definition_file)
# Delete the files that were supposed to be moved.
for src in delete_when_done:
self.delete_file(src)
except Exception as ex:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
self.log.exception('Error transferring files to Shaman')
self.error_set('Unexpected exception transferring files to Shaman: %s' % ex)
# noinspection PyBroadException
def _create_checkout_definition(self) \
-> typing.Tuple[bytes, typing.Set[str], typing.List[pathlib.Path]]:
"""Create the checkout definition file for this BAT pack.
:returns: the checkout definition (as bytes), a set of paths in that file,
and list of paths to delete.
If there was an error and file transfer was aborted, the checkout
definition file will be empty.
"""
from . import cache
definition_lines = [] # type: typing.List[bytes]
delete_when_done = [] # type: typing.List[pathlib.Path]
# We keep track of the relative paths we want to send to the Shaman,
# so that the Shaman cannot ask us to upload files we didn't want to.
relpaths = set() # type: typing.Set[str]
for src, dst, act in self.iter_queue():
try:
checksum = cache.compute_cached_checksum(src)
filesize = src.stat().st_size
# relpath = dst.relative_to(self.project_root)
relpath = bpathlib.strip_root(dst).as_posix()
self._file_info[relpath] = FileInfo(
checksum=checksum,
filesize=filesize,
abspath=src,
)
line = '%s %s %s' % (checksum, filesize, relpath)
definition_lines.append(line.encode('utf8'))
relpaths.add(relpath)
if act == bat_transfer.Action.MOVE:
delete_when_done.append(src)
except Exception:
# We have to catch exceptions in a broad way, as this is running in
# a separate thread, and exceptions won't otherwise be seen.
msg = 'Error transferring %s to %s' % (src, dst)
self.log.exception(msg)
# Put the files to copy back into the queue, and abort. This allows
# the main thread to inspect the queue and see which files were not
# copied. The one we just failed (due to this exception) should also
# be reported there.
self.queue.put((src, dst, act))
self.error_set(msg)
return b'', set(), delete_when_done
cache.cleanup_cache()
return b'\n'.join(definition_lines), relpaths, delete_when_done
def _send_checkout_def_to_shaman(self, definition_file: bytes,
allowed_relpaths: typing.Set[str]) \
-> typing.Optional[collections.deque]:
"""Send the checkout definition file to the Shaman.
:return: An iterable of paths (relative to the project root) that still
need to be uploaded, or None if there was an error.
"""
resp = self.client.post('checkout/requirements', data=definition_file, stream=True,
headers={'Content-Type': 'text/plain'},
timeout=15)
if resp.status_code >= 300:
msg = 'Error from Shaman, code %d: %s' % (resp.status_code, resp.text)
self.log.error(msg)
self.error_set(msg)
return None
to_upload = collections.deque() # type: collections.deque
for line in resp.iter_lines():
response, path = line.decode().split(' ', 1)
self.log.debug(' %s: %s', response, path)
if path not in allowed_relpaths:
msg = 'Shaman requested path we did not intend to upload: %r' % path
self.log.error(msg)
self.error_set(msg)
return None
if response == response_file_unknown:
to_upload.appendleft(path)
elif response == response_already_uploading:
to_upload.append(path)
elif response == 'ERROR':
msg = 'Error from Shaman: %s' % path
self.log.error(msg)
self.error_set(msg)
return None
else:
msg = 'Unknown response from Shaman for path %r: %r' % (path, response)
self.log.error(msg)
self.error_set(msg)
return None
return to_upload
def _upload_files(self, to_upload: collections.deque) -> typing.Set[str]:
"""Actually upload the files to Shaman.
Returns the set of files that we did not upload.
"""
failed_paths = set() # type: typing.Set[str]
deferred_paths = set()
def defer(some_path: str):
nonlocal to_upload
self.log.info(' %s deferred (already being uploaded by someone else)', some_path)
deferred_paths.add(some_path)
# Instead of deferring this one file, randomize the files to upload.
# This prevents multiple deferrals when someone else is uploading
# files from the same project (because it probably happens alphabetically).
all_files = list(to_upload)
random.shuffle(all_files)
to_upload = collections.deque(all_files)
if not to_upload:
self.log.info('All %d files are at the Shaman already', len(self._file_info))
self.report_transferred(0)
return failed_paths
self.log.info('Going to upload %d of %d files', len(to_upload), len(self._file_info))
while to_upload:
# After too many failures, just retry to get a fresh set of files to upload.
if len(failed_paths) > MAX_FAILED_PATHS:
self.log.info('Too many failures, going to abort this iteration')
failed_paths.update(to_upload)
return failed_paths
path = to_upload.popleft()
fileinfo = self._file_info[path]
self.log.info(' %s', path)
headers = {
'X-Shaman-Original-Filename': path,
}
# Let the Shaman know whether we can defer uploading this file or not.
can_defer = (len(deferred_paths) < MAX_DEFERRED_PATHS
and path not in deferred_paths
and len(to_upload))
if can_defer:
headers['X-Shaman-Can-Defer-Upload'] = 'true'
url = 'files/%s/%d' % (fileinfo.checksum, fileinfo.filesize)
try:
with fileinfo.abspath.open('rb') as infile:
resp = self.client.post(url, data=infile, headers=headers)
except requests.ConnectionError as ex:
if can_defer:
# Closing the connection with an 'X-Shaman-Can-Defer-Upload: true' header
# indicates that we should defer the upload. Requests doesn't give us the
# reply, even though it was written by the Shaman before it closed the
# connection.
defer(path)
else:
self.log.info(' %s could not be uploaded, might retry later: %s', path, ex)
failed_paths.add(path)
continue
if resp.status_code == 208:
# For small files we get the 208 response, because the server closes the
# connection after we sent the entire request. For bigger files the server
# responds sooner, and Requests gives us the above ConnectionError.
if can_defer:
defer(path)
else:
self.log.info(' %s skipped (already existed on the server)', path)
continue
if resp.status_code >= 300:
msg = 'Error from Shaman uploading %s, code %d: %s' % (
fileinfo.abspath, resp.status_code, resp.text)
self.log.error(msg)
self.error_set(msg)
return failed_paths
failed_paths.discard(path)
self.uploaded_files += 1
file_size = fileinfo.abspath.stat().st_size
self.uploaded_bytes += file_size
self.report_transferred(file_size)
if not failed_paths:
self.log.info('Done uploading %d bytes in %d files',
self.uploaded_bytes, self.uploaded_files)
else:
self.log.info('Uploaded %d bytes in %d files so far',
self.uploaded_bytes, self.uploaded_files)
return failed_paths
def report_transferred(self, bytes_transferred: int):
if self._abort.is_set():
self.log.warning('Interrupting ongoing upload')
raise self.AbortUpload('interrupting ongoing upload')
super().report_transferred(bytes_transferred)
def _request_checkout(self, definition_file: bytes):
"""Ask the Shaman to create a checkout of this BAT pack."""
if not self.checkout_id:
self.log.warning('NOT requesting checkout at Shaman')
return
self.log.info('Requesting checkout at Shaman for checkout_id=%r', self.checkout_id)
resp = self.client.post('checkout/create/%s' % self.checkout_id, data=definition_file,
headers={'Content-Type': 'text/plain'})
if resp.status_code >= 300:
msg = 'Error from Shaman, code %d: %s' % (resp.status_code, resp.text)
self.log.error(msg)
self.error_set(msg)
return
self._checkout_location = resp.text.strip()
self.log.info('Response from Shaman, code %d: %s', resp.status_code, resp.text)
@property
def checkout_location(self) -> str:
"""Returns the checkout location, or '' if no checkout was made."""
if not self._checkout_location:
return ''
return self._checkout_location