Report progress of S3 uploads while the upload is happening
Previously it only reported progress after uploading each file.
This commit is contained in:
parent
cd32442f5a
commit
1111465061
@ -35,7 +35,7 @@ def compute_md5(filepath: pathlib.Path) -> str:
|
|||||||
hasher = hashlib.md5()
|
hasher = hashlib.md5()
|
||||||
with filepath.open('rb') as infile:
|
with filepath.open('rb') as infile:
|
||||||
while True:
|
while True:
|
||||||
block = infile.read(10240)
|
block = infile.read(102400)
|
||||||
if not block:
|
if not block:
|
||||||
break
|
break
|
||||||
hasher.update(block)
|
hasher.update(block)
|
||||||
@ -137,23 +137,22 @@ class S3Transferrer(transfer.FileTransferer):
|
|||||||
src, existing_md5)
|
src, existing_md5)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# TODO(Sybren): when queueing files inspect their size, and have a
|
|
||||||
# callback that reports the total progress.
|
|
||||||
log.info('Uploading %s', src)
|
log.info('Uploading %s', src)
|
||||||
try:
|
try:
|
||||||
self.client.upload_file(str(src),
|
self.client.upload_file(str(src),
|
||||||
Bucket=bucket,
|
Bucket=bucket,
|
||||||
Key=key,
|
Key=key,
|
||||||
Callback=self._upload_callback,
|
Callback=self.report_transferred,
|
||||||
ExtraArgs={'Metadata': {'md5': md5}})
|
ExtraArgs={'Metadata': {'md5': md5}})
|
||||||
except self.AbortUpload:
|
except self.AbortUpload:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _upload_callback(self, bytes_uploaded: int):
|
def report_transferred(self, bytes_transferred: int):
|
||||||
if self._abort.is_set():
|
if self._abort.is_set():
|
||||||
log.warning('Interrupting ongoing upload')
|
log.warning('Interrupting ongoing upload')
|
||||||
raise self.AbortUpload('interrupting ongoing upload')
|
raise self.AbortUpload('interrupting ongoing upload')
|
||||||
|
super().report_transferred(bytes_transferred)
|
||||||
|
|
||||||
def delete_file(self, path: pathlib.Path):
|
def delete_file(self, path: pathlib.Path):
|
||||||
"""Deletes a file, only logging a warning if deletion fails."""
|
"""Deletes a file, only logging a warning if deletion fails."""
|
||||||
@ -172,6 +171,7 @@ class S3Transferrer(transfer.FileTransferer):
|
|||||||
"""
|
"""
|
||||||
import botocore.exceptions
|
import botocore.exceptions
|
||||||
|
|
||||||
|
log.debug('Getting metadata of %s/%s', bucket, key)
|
||||||
try:
|
try:
|
||||||
info = self.client.head_object(Bucket=bucket, Key=key)
|
info = self.client.head_object(Bucket=bucket, Key=key)
|
||||||
except botocore.exceptions.ClientError as ex:
|
except botocore.exceptions.ClientError as ex:
|
||||||
|
|||||||
@ -95,10 +95,10 @@ class FileTransferer(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
self.queue.put((src, dst, Action.MOVE))
|
self.queue.put((src, dst, Action.MOVE))
|
||||||
self.total_queued_bytes += src.stat().st_size
|
self.total_queued_bytes += src.stat().st_size
|
||||||
|
|
||||||
def report_transferred(self, block_size: int):
|
def report_transferred(self, bytes_transferred: int):
|
||||||
"""Report transfer of `block_size` bytes."""
|
"""Report transfer of `block_size` bytes."""
|
||||||
|
|
||||||
self.total_transferred_bytes += block_size
|
self.total_transferred_bytes += bytes_transferred
|
||||||
self.progress_cb.transfer_progress(self.total_queued_bytes, self.total_transferred_bytes)
|
self.progress_cb.transfer_progress(self.total_queued_bytes, self.total_transferred_bytes)
|
||||||
|
|
||||||
def done_and_join(self) -> None:
|
def done_and_join(self) -> None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user