diff --git a/Pipfile b/Pipfile index 5f71e25..05b0422 100644 --- a/Pipfile +++ b/Pipfile @@ -6,7 +6,7 @@ name = "pypi" [packages] [dev-packages] -blender-asset-tracer = {editable = true, path = "."} +blender-asset-tracer = {editable = true,path = "."} mypy = "*" pytest = "*" pytest-cov = "*" @@ -19,6 +19,7 @@ sphinx = "*" sphinx-autobuild = "*" sphinx-rtd-theme = "*" twine = "*" +responses = "*" [requires] python_version = "3.5" diff --git a/Pipfile.lock b/Pipfile.lock index 219f277..710b9e4 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "52342a62635f2a04d43bd18f46efee601b648ad7ae99146c17c4b3161de88ae1" + "sha256": "f83b8983e69d5d799b27888b538a4a347eca2d4177ea59052be7ae976bee48d2" }, "pipfile-spec": 6, "requires": { @@ -254,14 +254,6 @@ ], "version": "==19.0" }, - "pathlib2": { - "hashes": [ - "sha256:25199318e8cc3c25dcb45cbe084cc061051336d5a9ea2a12448d3d8cb748f742", - "sha256:5887121d7f7df3603bca2f710e7219f3eca0eb69e0b7cc6e0a022e155ac931a7" - ], - "markers": "python_version < '3.6'", - "version": "==2.3.3" - }, "pathtools": { "hashes": [ "sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0" @@ -399,6 +391,14 @@ ], "version": "==0.9.1" }, + "responses": { + "hashes": [ + "sha256:c85882d2dc608ce6b5713a4e1534120f4a0dc6ec79d1366570d2b0c909a50c87", + "sha256:ea5a14f9aea173e3b786ff04cf03133c2dabd4103dbaef1028742fd71a6c2ad3" + ], + "index": "pypi", + "version": "==0.10.5" + }, "six": { "hashes": [ "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", diff --git a/blender_asset_tracer/cli/pack.py b/blender_asset_tracer/cli/pack.py index c598209..e718b8e 100644 --- a/blender_asset_tracer/cli/pack.py +++ b/blender_asset_tracer/cli/pack.py @@ -37,9 +37,10 @@ def add_parser(subparsers): parser.add_argument('blendfile', type=pathlib.Path, help='The Blend file to pack.') parser.add_argument('target', type=str, - help='The target can be a directory, a ZIP file (does not have to exist ' - "yet, just use 'something.zip' as target), or a URL of S3 storage " - '(s3://endpoint/path).') + help="The target can be a directory, a ZIP file (does not have to exist " + "yet, just use 'something.zip' as target), " + "or a URL of S3 storage (s3://endpoint/path) " + "or Shaman storage (shaman://endpoint/#checkoutID).") parser.add_argument('-p', '--project', type=pathlib.Path, help='Root directory of your project. Paths to below this directory are ' @@ -88,6 +89,20 @@ def create_packer(args, bpath: pathlib.Path, ppath: pathlib.Path, target: str) - raise ValueError('S3 uploader does not support the --relative-only option') packer = create_s3packer(bpath, ppath, pathlib.PurePosixPath(target)) + + elif target.startswith('shaman+http:/') or target.startswith('shaman+https:/') \ + or target.startswith('shaman:/'): + if args.noop: + raise ValueError('Shaman uploader does not support no-op.') + + if args.compress: + raise ValueError('Shaman uploader does not support on-the-fly compression') + + if args.relative_only: + raise ValueError('Shaman uploader does not support the --relative-only option') + + packer = create_shamanpacker(bpath, ppath, target) + elif target.lower().endswith('.zip'): from blender_asset_tracer.pack import zipped @@ -122,6 +137,39 @@ def create_s3packer(bpath, ppath, tpath) -> pack.Packer: return s3.S3Packer(bpath, ppath, tpath, endpoint=endpoint) +def create_shamanpacker(bpath: pathlib.Path, ppath: pathlib.Path, tpath: str) -> pack.Packer: + """Creates a package for sending files to a Shaman server. + + URLs should have the form: + shaman://hostname/base/url#jobID + This uses HTTPS to connect to the server. To connect using HTTP, use: + shaman+http://hostname/base-url#jobID + """ + + import urllib.parse + from blender_asset_tracer.pack import shaman + + urlparts = urllib.parse.urlparse(str(tpath)) + + if urlparts.scheme in {'shaman', 'shaman+https'}: + scheme = 'https' + elif urlparts.scheme == 'shaman+http': + scheme = 'http' + else: + raise SystemExit('Invalid scheme %r, choose shaman:// or shaman+http://', urlparts.scheme) + + checkout_id = urlparts.fragment + if not checkout_id: + log.warning('No checkout ID given on the URL. Going to send BAT pack to Shaman, ' + 'but NOT creating a checkout') + + new_urlparts = (scheme, *urlparts[1:-1], '') + endpoint = urllib.parse.urlunparse(new_urlparts) + + log.info('Uploading to Shaman server %s with job %s', endpoint, checkout_id) + return shaman.ShamanPacker(bpath, ppath, tpath, endpoint=endpoint, checkout_id=checkout_id) + + def paths_from_cli(args) -> typing.Tuple[pathlib.Path, pathlib.Path, str]: """Return paths to blendfile, project, and pack target. diff --git a/blender_asset_tracer/pack/shaman/__init__.py b/blender_asset_tracer/pack/shaman/__init__.py new file mode 100644 index 0000000..a4b14b9 --- /dev/null +++ b/blender_asset_tracer/pack/shaman/__init__.py @@ -0,0 +1,67 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +"""Shaman Client interface.""" +import logging +import pathlib +import urllib.parse + +import requests + +import blender_asset_tracer.pack as bat_pack +import blender_asset_tracer.pack.transfer as bat_transfer + +log = logging.getLogger(__name__) + + +class ShamanPacker(bat_pack.Packer): + """Creates BAT Packs on a Shaman server.""" + + def __init__(self, + bfile: pathlib.Path, + project: pathlib.Path, + target: str, + endpoint: str, + checkout_id: str, + **kwargs) -> None: + """Constructor + + :param endpoint: URL of the Shaman endpoint. + """ + super().__init__(bfile, project, target, **kwargs) + self.checkout_id = checkout_id + self.shaman_endpoint = endpoint + + def _get_auth_token(self) -> str: + # TODO: get a token from the Flamenco Server. + log.warning('Using temporary hack to get auth token from Shaman') + resp = requests.get(urllib.parse.urljoin(self.shaman_endpoint, 'get-token')) + return resp.text + + def _create_file_transferer(self) -> bat_transfer.FileTransferer: + from . import transfer + + # TODO: pass self._get_auth_token itself, so that the Transferer will be able to + # decide when to get this token (and how many times). + auth_token = self._get_auth_token() + return transfer.ShamanTransferrer(auth_token, self.project, self.shaman_endpoint, + self.checkout_id) + + def _make_target_path(self, target: str) -> pathlib.PurePath: + return pathlib.PurePosixPath('/') diff --git a/blender_asset_tracer/pack/shaman/auth.py b/blender_asset_tracer/pack/shaman/auth.py new file mode 100644 index 0000000..5878e7a --- /dev/null +++ b/blender_asset_tracer/pack/shaman/auth.py @@ -0,0 +1,28 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +import requests + +token = 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMjM0NSIsImV4cCI6MTU1MDI0NDUxMiwiaWF0IjoxNTUwMTU4MTEyLCJzdWIiOiJ1c2VyLUlEIn0.oahZHIVBmULFz0JhOjv4-AEN8vdURjGBiIDdZbvW9A2FQWdi0RyrW2KpcHHpKS8KiG81p9pn2bVytMrRJ8Cjmw' + + +def session(): + sess = requests.session() + sess.headers['Authorization'] = 'Bearer ' + token + return sess diff --git a/blender_asset_tracer/pack/shaman/cache.py b/blender_asset_tracer/pack/shaman/cache.py new file mode 100644 index 0000000..fc08257 --- /dev/null +++ b/blender_asset_tracer/pack/shaman/cache.py @@ -0,0 +1,197 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel + +import base64 +import hashlib +import json +import logging +import sys +import time +import typing +from collections import deque +from pathlib import Path + +from . import time_tracker + +CACHE_ROOT = Path().home() / '.cache/shaman-client/shasums' +MAX_CACHE_FILES_AGE_SECS = 3600 * 24 * 60 # 60 days + +log = logging.getLogger(__name__) + + +class TimeInfo: + computing_checksums = 0.0 + checksum_cache_handling = 0.0 + + +def find_files(root: Path) -> typing.Iterable[Path]: + """Recursively finds files in the given root path. + + Directories are recursed into, and file paths are yielded. + Symlinks are yielded if they refer to a regular file. + """ + queue = deque([root]) + while queue: + path = queue.popleft() + + # Ignore hidden files/dirs; these can be things like '.svn' or '.git', + # which shouldn't be sent to Shaman. + if path.name.startswith('.'): + continue + + if path.is_dir(): + for child in path.iterdir(): + queue.append(child) + continue + + # Only yield symlinks if they link to (a link to) a normal file. + if path.is_symlink(): + symlinked = path.resolve() + if symlinked.is_file(): + yield path + continue + + if path.is_file(): + yield path + + +def compute_checksum(filepath: Path) -> str: + """Compute the SHA256 checksum for the given file.""" + blocksize = 32 * 1024 + + log.debug('Computing checksum of %s', filepath) + with time_tracker.track_time(TimeInfo, 'computing_checksums'): + hasher = hashlib.sha256() + with filepath.open('rb') as infile: + while True: + block = infile.read(blocksize) + if not block: + break + hasher.update(block) + checksum = hasher.hexdigest() + return checksum + + +def _cache_path(filepath: Path) -> Path: + """Compute the cache file for the given file path.""" + + fs_encoding = sys.getfilesystemencoding() + filepath = filepath.absolute() + + # Reverse the directory, because most variation is in the last bytes. + rev_dir = str(filepath.parent)[::-1] + encoded_path = filepath.stem + rev_dir + filepath.suffix + cache_key = base64.urlsafe_b64encode(encoded_path.encode(fs_encoding)).decode().rstrip('=') + + cache_path = CACHE_ROOT / cache_key[:10] / cache_key[10:] + return cache_path + + +def compute_cached_checksum(filepath: Path) -> str: + """Computes the SHA256 checksum. + + The checksum is cached to disk. If the cache is still valid, it is used to + skip the actual SHA256 computation. + """ + + with time_tracker.track_time(TimeInfo, 'checksum_cache_handling'): + current_stat = filepath.stat() + cache_path = _cache_path(filepath) + + try: + with cache_path.open('r') as cache_file: + payload = json.load(cache_file) + except (OSError, ValueError): + # File may not exist, or have invalid contents. + pass + else: + checksum = payload.get('checksum', '') + cached_mtime = payload.get('file_mtime', 0.0) + cached_size = payload.get('file_size', -1) + + if (checksum + and current_stat.st_size == cached_size + and abs(cached_mtime - current_stat.st_mtime) < 0.01): + cache_path.touch() + return checksum + + checksum = compute_checksum(filepath) + + with time_tracker.track_time(TimeInfo, 'checksum_cache_handling'): + payload = { + 'checksum': checksum, + 'file_mtime': current_stat.st_mtime, + 'file_size': current_stat.st_size, + } + + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + with cache_path.open('w') as cache_file: + json.dump(payload, cache_file) + except IOError as ex: + log.warning('Unable to write checksum cache file %s: %s', cache_path, ex) + + return checksum + + +def cleanup_cache() -> None: + """Remove all cache files that are older than MAX_CACHE_FILES_AGE_SECS.""" + + if not CACHE_ROOT.exists(): + return + + with time_tracker.track_time(TimeInfo, 'checksum_cache_handling'): + queue = deque([CACHE_ROOT]) + rmdir_queue = [] + + now = time.time() + num_removed_files = 0 + num_removed_dirs = 0 + while queue: + path = queue.popleft() + + if path.is_dir(): + queue.extend(path.iterdir()) + rmdir_queue.append(path) + continue + + assert path.is_file() + path.relative_to(CACHE_ROOT) + + age = now - path.stat().st_mtime + # Don't trust files from the future either. + if 0 <= age <= MAX_CACHE_FILES_AGE_SECS: + continue + + path.unlink() + num_removed_files += 1 + + for dirpath in reversed(rmdir_queue): + assert dirpath.is_dir() + dirpath.relative_to(CACHE_ROOT) + + try: + dirpath.rmdir() + num_removed_dirs += 1 + except OSError: + pass + + if num_removed_dirs or num_removed_files: + log.info('Cache Cleanup: removed %d dirs and %d files', num_removed_dirs, num_removed_files) diff --git a/blender_asset_tracer/pack/shaman/client.py b/blender_asset_tracer/pack/shaman/client.py new file mode 100644 index 0000000..76b7dfa --- /dev/null +++ b/blender_asset_tracer/pack/shaman/client.py @@ -0,0 +1,121 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel + +import urllib.parse + +import requests.adapters + + +class ShamanClient: + """Thin wrapper around a Requests session to perform Shaman requests.""" + + def __init__(self, auth_token: str, base_url: str): + self._auth_token = auth_token + self._base_url = base_url + + http_adapter = requests.adapters.HTTPAdapter(max_retries=5) + self._session = requests.session() + self._session.mount('https://', http_adapter) + self._session.mount('http://', http_adapter) + self._session.headers['Authorization'] = 'Bearer ' + auth_token + + def request(self, method: str, url: str, **kwargs) -> requests.Response: + full_url = urllib.parse.urljoin(self._base_url, url) + return self._session.request(method, full_url, **kwargs) + + def get(self, url, **kwargs): + r"""Sends a GET request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + kwargs.setdefault('allow_redirects', True) + return self.request('GET', url, **kwargs) + + def options(self, url, **kwargs): + r"""Sends a OPTIONS request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + kwargs.setdefault('allow_redirects', True) + return self.request('OPTIONS', url, **kwargs) + + def head(self, url, **kwargs): + r"""Sends a HEAD request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + kwargs.setdefault('allow_redirects', False) + return self.request('HEAD', url, **kwargs) + + def post(self, url, data=None, json=None, **kwargs): + r"""Sends a POST request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param data: (optional) Dictionary, list of tuples, bytes, or file-like + object to send in the body of the :class:`Request`. + :param json: (optional) json to send in the body of the :class:`Request`. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + return self.request('POST', url, data=data, json=json, **kwargs) + + def put(self, url, data=None, **kwargs): + r"""Sends a PUT request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param data: (optional) Dictionary, list of tuples, bytes, or file-like + object to send in the body of the :class:`Request`. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + return self.request('PUT', url, data=data, **kwargs) + + def patch(self, url, data=None, **kwargs): + r"""Sends a PATCH request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param data: (optional) Dictionary, list of tuples, bytes, or file-like + object to send in the body of the :class:`Request`. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + return self.request('PATCH', url, data=data, **kwargs) + + def delete(self, url, **kwargs): + r"""Sends a DELETE request. Returns :class:`Response` object. + + :param url: URL for the new :class:`Request` object. + :param kwargs: Optional arguments that ``request`` takes. + :rtype: requests.Response + """ + + return self.request('DELETE', url, **kwargs) diff --git a/blender_asset_tracer/pack/shaman/time_tracker.py b/blender_asset_tracer/pack/shaman/time_tracker.py new file mode 100644 index 0000000..5279bc3 --- /dev/null +++ b/blender_asset_tracer/pack/shaman/time_tracker.py @@ -0,0 +1,32 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +import contextlib +import time +import typing + + +@contextlib.contextmanager +def track_time(tracker_object: typing.Any, attribute: str): + """Context manager, tracks how long the context took to run.""" + start_time = time.monotonic() + yield + duration = time.monotonic() - start_time + tracked_so_far = getattr(tracker_object, attribute, 0.0) + setattr(tracker_object, attribute, tracked_so_far + duration) diff --git a/blender_asset_tracer/pack/shaman/transfer.py b/blender_asset_tracer/pack/shaman/transfer.py new file mode 100644 index 0000000..b74f4f9 --- /dev/null +++ b/blender_asset_tracer/pack/shaman/transfer.py @@ -0,0 +1,340 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +import collections +import logging +import pathlib +import random +import typing + +import requests + +import blender_asset_tracer.pack.transfer as bat_transfer + +MAX_DEFERRED_PATHS = 8 +MAX_FAILED_PATHS = 8 + + +class FileInfo: + def __init__(self, checksum: str, filesize: int, abspath: pathlib.Path): + self.checksum = checksum + self.filesize = filesize + self.abspath = abspath + + +class ShamanTransferrer(bat_transfer.FileTransferer): + """Sends files to a Shaman server.""" + + class AbortUpload(Exception): + """Raised from the upload callback to abort an upload.""" + + def __init__(self, auth_token: str, project_root: pathlib.Path, + shaman_endpoint: str, checkout_id: str) -> None: + from . import client + super().__init__() + self.client = client.ShamanClient(auth_token, shaman_endpoint) + self.project_root = project_root + self.checkout_id = checkout_id + self.log = logging.getLogger(__name__) + + self._file_info = {} # type: typing.Dict[str, FileInfo] + + # When the Shaman creates a checkout, it'll return the location of that + # checkout. This can then be combined with the project-relative path + # of the to-be-rendered blend file (e.g. the one 'bat pack' was pointed + # at). + self._checkout_location = '' + + # noinspection PyBroadException + def run(self) -> None: + try: + # Construct the Shaman Checkout Definition file. + # This blocks until we know the entire list of files to transfer. + definition_file, allowed_relpaths, delete_when_done = self._create_checkout_definition() + if not definition_file: + # An error has already been logged. + return + + self.log.info('Created checkout definition file of %d KiB', + len(definition_file) // 1024) + self.log.info('Feeding %d files to the Shaman', len(self._file_info)) + + # Try to upload all the files. + failed_paths = set() # type: typing.Set[str] + max_tries = 50 + for try_index in range(max_tries): + # Send the file to the Shaman and see what we still need to send there. + to_upload = self._send_checkout_def_to_shaman(definition_file, allowed_relpaths) + if to_upload is None: + # An error has already been logged. + return + + # Send the files that still need to be sent. + self.log.info('Upload attempt %d', try_index+1) + failed_paths = self._upload_files(to_upload) + if not failed_paths: + break + + # Having failed paths at this point is expected when multiple + # clients are sending the same files. Instead of retrying on a + # file-by-file basis, we just re-send the checkout definition + # file to the Shaman and obtain a new list of files to upload. + + if failed_paths: + self.log.error('Aborting upload due to too many failures') + self.error_set('Giving up after %d attempts to upload the files' % max_tries) + return + + self.log.info('All files uploaded succesfully') + self._request_checkout(definition_file) + + # Delete the files that were supposed to be moved. + for src in delete_when_done: + self.delete_file(src) + + except Exception as ex: + # We have to catch exceptions in a broad way, as this is running in + # a separate thread, and exceptions won't otherwise be seen. + self.log.exception('Error transferring files to Shaman') + self.error_set('Unexpected exception transferring files to Shaman: %s' % ex) + + # noinspection PyBroadException + def _create_checkout_definition(self) \ + -> typing.Tuple[bytes, typing.Set[str], typing.List[pathlib.Path]]: + """Create the checkout definition file for this BAT pack. + + :returns: the checkout definition (as bytes), a set of paths in that file, + and list of paths to delete. + + If there was an error and file transfer was aborted, the checkout + definition file will be empty. + """ + from . import cache + + definition_lines = [] # type: typing.List[bytes] + delete_when_done = [] # type: typing.List[pathlib.Path] + + # We keep track of the relative paths we want to send to the Shaman, + # so that the Shaman cannot ask us to upload files we didn't want to. + relpaths = set() # type: typing.Set[str] + + for src, dst, act in self.iter_queue(): + try: + checksum = cache.compute_cached_checksum(src) + filesize = src.stat().st_size + # relpath = dst.relative_to(self.project_root) + relpath = str(dst)[1:] + + self._file_info[relpath] = FileInfo( + checksum=checksum, + filesize=filesize, + abspath=src, + ) + line = '%s %s %s' % (checksum, filesize, relpath) + definition_lines.append(line.encode('utf8')) + relpaths.add(relpath) + + if act == bat_transfer.Action.MOVE: + delete_when_done.append(src) + except Exception: + # We have to catch exceptions in a broad way, as this is running in + # a separate thread, and exceptions won't otherwise be seen. + msg = 'Error transferring %s to %s' % (src, dst) + self.log.exception(msg) + # Put the files to copy back into the queue, and abort. This allows + # the main thread to inspect the queue and see which files were not + # copied. The one we just failed (due to this exception) should also + # be reported there. + self.queue.put((src, dst, act)) + self.error_set(msg) + return b'', set(), delete_when_done + + cache.cleanup_cache() + return b'\n'.join(definition_lines), relpaths, delete_when_done + + def _send_checkout_def_to_shaman(self, definition_file: bytes, + allowed_relpaths: typing.Set[str]) \ + -> typing.Optional[typing.Deque[str]]: + """Send the checkout definition file to the Shaman. + + :return: An iterable of paths (relative to the project root) that still + need to be uploaded, or None if there was an error. + """ + resp = self.client.post('checkout/requirements', data=definition_file, stream=True, + headers={'Content-Type': 'text/plain'}) + if resp.status_code >= 300: + msg = 'Error from Shaman, code %d: %s' % (resp.status_code, resp.text) + self.log.error(msg) + self.error_set(msg) + return None + + to_upload = collections.deque() # type: typing.Deque[str] + for line in resp.iter_lines(): + response, path = line.decode().split(' ', 1) + self.log.debug(' %s: %s', response, path) + + if path not in allowed_relpaths: + msg = 'Shaman requested path we did not intend to upload: %r' % path + self.log.error(msg) + self.error_set(msg) + return None + + if response == 'does-not-exist': + to_upload.appendleft(path) + elif response == 'already-uploading': + to_upload.append(path) + elif response == 'ERROR': + msg = 'Error from Shaman: %s' % path + self.log.error(msg) + self.error_set(msg) + return None + else: + msg = 'Unknown response from Shaman for path %r: %r' % (path, response) + self.log.error(msg) + self.error_set(msg) + return None + + return to_upload + + def _upload_files(self, to_upload: typing.Deque[str]) -> typing.Set[str]: + """Actually upload the files to Shaman. + + Returns the set of files that we did not upload. + """ + failed_paths = set() # type: typing.Set[str] + deferred_paths = set() + + def defer(some_path: str): + nonlocal to_upload + + self.log.info(' %s deferred (already being uploaded by someone else)', some_path) + deferred_paths.add(some_path) + + # Instead of deferring this one file, randomize the files to upload. + # This prevents multiple deferrals when someone else is uploading + # files from the same project (because it probably happens alphabetically). + all_files = list(to_upload) + random.shuffle(all_files) + to_upload = collections.deque(all_files) + + if not to_upload: + self.log.info('All %d files are at the Shaman already', len(self._file_info)) + self.report_transferred(0) + return failed_paths + + self.log.info('Going to upload %d of %d files', len(to_upload), len(self._file_info)) + uploaded_files = 0 + uploaded_bytes = 0 + while to_upload: + # After too many failures, just retry to get a fresh set of files to upload. + if len(failed_paths) > MAX_FAILED_PATHS: + self.log.info('Too many failures, going to abort this iteration') + failed_paths.update(to_upload) + return failed_paths + + path = to_upload.popleft() + fileinfo = self._file_info[path] + self.log.info(' %s', path) + + # Let the Shaman know whether we can defer uploading this file or not. + headers = {} + can_defer = (len(deferred_paths) < MAX_DEFERRED_PATHS + and path not in deferred_paths + and len(to_upload)) + if can_defer: + headers['X-Shaman-Can-Defer-Upload'] = 'true' + + url = 'files/%s/%d' % (fileinfo.checksum, fileinfo.filesize) + try: + with fileinfo.abspath.open('rb') as infile: + resp = self.client.post(url, data=infile, headers=headers) + + except requests.ConnectionError as ex: + if can_defer: + # Closing the connection with an 'X-Shaman-Can-Defer-Upload: true' header + # indicates that we should defer the upload. Requests doesn't give us the + # reply, even though it was written by the Shaman before it closed the + # connection. + defer(path) + else: + self.log.info(' %s could not be uploaded, might retry later: %s', path, ex) + failed_paths.add(path) + continue + + if resp.status_code == 208: + # For small files we get the 208 response, because the server closes the + # connection after we sent the entire request. For bigger files the server + # responds sooner, and Requests gives us the above ConnectionError. + if can_defer: + defer(path) + else: + self.log.info(' %s skipped (already existed on the server)', path) + continue + + if resp.status_code >= 300: + msg = 'Error from Shaman uploading %s, code %d: %s' % ( + fileinfo.abspath, resp.status_code, resp.text) + self.log.error(msg) + self.error_set(msg) + return failed_paths + + failed_paths.discard(path) + uploaded_files += 1 + file_size = fileinfo.abspath.stat().st_size + uploaded_bytes += file_size + self.report_transferred(file_size) + + self.log.info('Uploaded %d bytes in %d files (%d files were already there)', + uploaded_bytes, uploaded_files, len(to_upload) - uploaded_files) + + if not failed_paths: + self.log.info('Done uploading files') + + return failed_paths + + def report_transferred(self, bytes_transferred: int): + if self._abort.is_set(): + self.log.warning('Interrupting ongoing upload') + raise self.AbortUpload('interrupting ongoing upload') + super().report_transferred(bytes_transferred) + + def _request_checkout(self, definition_file: bytes): + """Ask the Shaman to create a checkout of this BAT pack.""" + + if not self.checkout_id: + self.log.warning('NOT requesting checkout at Shaman') + return + + self.log.info('Requesting checkout at Shaman for checkout_id=%r', self.checkout_id) + resp = self.client.post('checkout/create/%s' % self.checkout_id, data=definition_file, + headers={'Content-Type': 'text/plain'}) + if resp.status_code >= 300: + msg = 'Error from Shaman, code %d: %s' % (resp.status_code, resp.text) + self.log.error(msg) + self.error_set(msg) + return + + self._checkout_location = resp.text.strip() + self.log.info('Response from Shaman, code %d: %s', resp.status_code, resp.text) + + @property + def checkout_location(self) -> str: + if not self._checkout_location: + raise ValueError('No checkout was created yet.') + return self._checkout_location diff --git a/tests/abstract_test.py b/tests/abstract_test.py index ef07db7..083d86f 100644 --- a/tests/abstract_test.py +++ b/tests/abstract_test.py @@ -1,3 +1,22 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel import logging import pathlib import unittest @@ -8,6 +27,7 @@ logging.basicConfig( format='%(asctime)-15s %(levelname)8s %(name)s %(message)s', level=logging.INFO) + class AbstractBlendFileTest(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/tests/test_shaman_cache.py b/tests/test_shaman_cache.py new file mode 100644 index 0000000..1062145 --- /dev/null +++ b/tests/test_shaman_cache.py @@ -0,0 +1,87 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +import json +import pathlib +from unittest import mock + +from abstract_test import AbstractBlendFileTest +from blender_asset_tracer.pack.shaman import cache + + +class AbstractChecksumTest(AbstractBlendFileTest): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.test_file = cls.blendfiles / 'linked_cube_compressed.blend' + cls.expected_checksum = '3c525e3a01ece11f26ded1e05e43284c4cce575c8074b97c6bdbc414fa2802ab' + + +class ChecksumTest(AbstractChecksumTest): + def test_checksum(self): + self.assertEqual(self.expected_checksum, cache.compute_checksum(self.test_file)) + + +class CachedChecksumTest(AbstractChecksumTest): + @mock.patch('blender_asset_tracer.pack.shaman.cache._cache_path') + @mock.patch('blender_asset_tracer.pack.shaman.cache.compute_checksum') + def test_cache_invalid_json(self, mock_compute_checksum, mock_cache_path): + mock_path = mock.MagicMock(spec=pathlib.Path) + mock_path.open().__enter__().read.return_value = 'je moeder' + mock_cache_path.return_value = mock_path + + mock_compute_checksum.return_value = 'computed-checksum' + + checksum = cache.compute_cached_checksum(self.test_file) + self.assertEqual('computed-checksum', checksum) + + @mock.patch('blender_asset_tracer.pack.shaman.cache._cache_path') + @mock.patch('blender_asset_tracer.pack.shaman.cache.compute_checksum') + def test_cache_valid_json(self, mock_compute_checksum, mock_cache_path): + stat = self.test_file.stat() + cache_info = { + 'checksum': 'cached-checksum', + 'file_mtime': stat.st_mtime + 0.0001, # mimick a slight clock skew + 'file_size': stat.st_size, + } + + mock_path = mock.MagicMock(spec=pathlib.Path) + mock_path.open().__enter__().read.return_value = json.dumps(cache_info) + mock_cache_path.return_value = mock_path + + mock_compute_checksum.return_value = 'computed-checksum' + + checksum = cache.compute_cached_checksum(self.test_file) + self.assertEqual('cached-checksum', checksum) + + @mock.patch('blender_asset_tracer.pack.shaman.cache._cache_path') + @mock.patch('blender_asset_tracer.pack.shaman.cache.compute_checksum') + def test_cache_not_exists(self, mock_compute_checksum, mock_cache_path): + mock_path = mock.MagicMock(spec=pathlib.Path) + mock_path.open.side_effect = [ + FileNotFoundError('Testing absent cache file'), + FileExistsError('Testing I/O error when writing'), + ] + mock_cache_path.return_value = mock_path + + mock_compute_checksum.return_value = 'computed-checksum' + + # This should not raise the FileExistsError + checksum = cache.compute_cached_checksum(self.test_file) + self.assertEqual('computed-checksum', checksum) diff --git a/tests/test_shaman_time_tracker.py b/tests/test_shaman_time_tracker.py new file mode 100644 index 0000000..95c2bc7 --- /dev/null +++ b/tests/test_shaman_time_tracker.py @@ -0,0 +1,29 @@ +import unittest +from unittest import mock + +from blender_asset_tracer.pack.shaman import time_tracker + + +class TimeTrackerTest(unittest.TestCase): + @mock.patch('time.monotonic') + def test_empty_class(self, mock_monotonic): + class TestClass: + pass + + mock_monotonic.side_effect = [1.25, 4.75] + with time_tracker.track_time(TestClass, 'some_attr'): + pass + + # noinspection PyUnresolvedReferences + self.assertEqual(3.5, TestClass.some_attr) + + @mock.patch('time.monotonic') + def test_with_value(self, mock_monotonic): + class TestClass: + some_attr = 4.125 + + mock_monotonic.side_effect = [1.25, 4.75] + with time_tracker.track_time(TestClass, 'some_attr'): + pass + + self.assertEqual(3.5 + 4.125, TestClass.some_attr) diff --git a/tests/test_shaman_transfer.py b/tests/test_shaman_transfer.py new file mode 100644 index 0000000..a213685 --- /dev/null +++ b/tests/test_shaman_transfer.py @@ -0,0 +1,93 @@ +# ***** BEGIN GPL LICENSE BLOCK ***** +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# ***** END GPL LICENCE BLOCK ***** +# +# (c) 2019, Blender Foundation - Sybren A. Stüvel +import json +import pathlib +from unittest import mock + +import responses + +from abstract_test import AbstractBlendFileTest +from blender_asset_tracer.pack.shaman import transfer + +httpmock = responses.RequestsMock() + + +class AbstractChecksumTest(AbstractBlendFileTest): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.test_file1 = cls.blendfiles / 'linked_cube_compressed.blend' + cls.test_file2 = cls.blendfiles / 'basic_file.blend' + cls.expected_checksums = { + cls.test_file1: '3c525e3a01ece11f26ded1e05e43284c4cce575c8074b97c6bdbc414fa2802ab', + cls.test_file2: 'd5283988d95f259069d4cd3c25a40526090534b8d188577b6c6fb36c3d481454', + } + cls.file_sizes = { + cls.test_file1: cls.test_file1.stat().st_size, + cls.test_file2: cls.test_file2.stat().st_size, + } + cls.packed_names = { + cls.test_file1: pathlib.Path('path/in/pack/test1.blend'), + cls.test_file2: pathlib.Path('path/in/pack/test2.blend'), + } + + def assertValidCheckoutDef(self, definition_file: bytes): + # We don't care much about the order, so compare as set. + expect_lines = set() + for filepath in [self.test_file1, self.test_file2]: + checksum = self.expected_checksums[filepath] + fsize = self.file_sizes[filepath] + relpath = str(self.packed_names[filepath]) + expect_lines.add(b'%s %d %s' % (checksum.encode(), fsize, relpath.encode())) + self.assertEqual(expect_lines, set(definition_file.split(b'\n'))) + + @httpmock.activate + def test_checkout_happy(self): + checksum1 = self.expected_checksums[self.test_file1] + fsize1 = self.file_sizes[self.test_file1] + + def mock_requirements(request): + self.assertEqual('text/plain', request.headers['Content-Type']) + self.assertValidCheckoutDef(request.body) + + body = 'does-not-exist path/in/pack/test1.blend\n' + return 200, {'Content-Type': 'text/plain'}, body + + def mock_checkout_create(request): + self.assertEqual('text/plain', request.headers['Content-Type']) + self.assertValidCheckoutDef(request.body) + return 200, {'Content-Type': 'text/plain'}, 'DA/-JOB-ID' + + httpmock.add_callback('POST', 'http://unittest.local:1234/checkout/requirements', + callback=mock_requirements) + + httpmock.add('POST', 'http://unittest.local:1234/files/%s/%d' % (checksum1, fsize1)) + httpmock.add_callback('POST', 'http://unittest.local:1234/checkout/create/DA-JOB-ID', + callback=mock_checkout_create) + + trans = transfer.ShamanTransferrer('auth-token', self.blendfiles, + 'http://unittest.local:1234/', 'DA-JOB-ID') + trans.start() + trans.queue_copy(self.test_file1, pathlib.Path('/') / self.packed_names[self.test_file1]) + trans.queue_copy(self.test_file2, pathlib.Path('/') / self.packed_names[self.test_file2]) + trans.done_and_join() + + self.assertFalse(trans.has_error) + self.assertEqual('DA/-JOB-ID', trans.checkout_location)