in gslib/utils/copy_helper.py [0:0]
def _DownloadObjectToFileResumable(src_url,
src_obj_metadata,
dst_url,
download_file_name,
gsutil_api,
logger,
digesters,
component_num=None,
start_byte=0,
end_byte=None,
decryption_key=None):
"""Downloads an object to a local file using the resumable strategy.
Args:
src_url: Source CloudUrl.
src_obj_metadata: Metadata from the source object.
dst_url: Destination FileUrl.
download_file_name: Temporary file name to be used for download.
gsutil_api: gsutil Cloud API instance to use for the download.
logger: for outputting log messages.
digesters: Digesters corresponding to the hash algorithms that will be used
for validation.
component_num: Which component of a sliced download this call is for, or
None if this is not a sliced download.
start_byte: The first byte of a byte range for a sliced download.
end_byte: The last byte of a byte range for a sliced download.
decryption_key: Base64-encoded decryption key for the source object, if any.
Returns:
(bytes_transferred, server_encoding)
bytes_transferred: Number of bytes transferred from server this call.
server_encoding: Content-encoding string if it was detected that the server
sent encoded bytes during transfer, None otherwise.
"""
if end_byte is None:
end_byte = src_obj_metadata.size - 1
download_size = end_byte - start_byte + 1
is_sliced = component_num is not None
api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
server_encoding = None
# Used for logging
download_name = dst_url.object_name
if is_sliced:
download_name += ' component %d' % component_num
fp = None
try:
fp = open(download_file_name, 'r+b')
fp.seek(start_byte)
api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
existing_file_size = GetFileSize(fp)
tracker_file_name, download_start_byte = ReadOrCreateDownloadTrackerFile(
src_obj_metadata,
dst_url,
logger,
api_selector,
start_byte,
existing_file_size,
component_num,
)
if download_start_byte < start_byte or download_start_byte > end_byte + 1:
DeleteTrackerFile(tracker_file_name)
raise CommandException(
'Resumable download start point for %s is not in the correct byte '
'range. Deleting tracker file, so if you re-try this download it '
'will start from scratch' % download_name)
download_complete = (download_start_byte == start_byte + download_size)
resuming = (download_start_byte != start_byte) and not download_complete
if resuming:
logger.info('Resuming download for %s', download_name)
elif download_complete:
logger.info(
'Download already complete for %s, skipping download but '
'will run integrity checks.', download_name)
# This is used for resuming downloads, but also for passing the mediaLink
# and size into the download for new downloads so that we can avoid
# making an extra HTTP call.
serialization_data = GetDownloadSerializationData(
src_obj_metadata,
progress=download_start_byte,
user_project=gsutil_api.user_project)
if resuming or download_complete:
# Catch up our digester with the hash data.
bytes_digested = 0
total_bytes_to_digest = download_start_byte - start_byte
hash_callback = ProgressCallbackWithTimeout(
total_bytes_to_digest,
FileProgressCallbackHandler(gsutil_api.status_queue,
component_num=component_num,
src_url=src_url,
dst_url=dst_url,
operation_name='Hashing').call)
while bytes_digested < total_bytes_to_digest:
bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
total_bytes_to_digest - bytes_digested)
data = fp.read(bytes_to_read)
bytes_digested += bytes_to_read
for alg_name in digesters:
digesters[alg_name].update(six.ensure_binary(data))
hash_callback.Progress(len(data))
elif not is_sliced:
# Delete file contents and start entire object download from scratch.
fp.truncate(0)
existing_file_size = 0
progress_callback = FileProgressCallbackHandler(
gsutil_api.status_queue,
start_byte=start_byte,
override_total_size=download_size,
src_url=src_url,
dst_url=dst_url,
component_num=component_num,
operation_name='Downloading').call
if global_copy_helper_opts.test_callback_file:
with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
progress_callback = pickle.loads(test_fp.read()).call
if is_sliced and src_obj_metadata.size >= ResumableThreshold():
fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
start_byte, end_byte)
compressed_encoding = ObjectIsGzipEncoded(src_obj_metadata)
# TODO: With gzip encoding (which may occur on-the-fly and not be part of
# the object's metadata), when we request a range to resume, it's possible
# that the server will just resend the entire object, which means our
# caught-up hash will be incorrect. We recalculate the hash on
# the local file in the case of a failed gzip hash anyway, but it would
# be better if we actively detected this case.
if not download_complete:
fp.seek(download_start_byte)
server_encoding = gsutil_api.GetObjectMedia(
src_url.bucket_name,
src_url.object_name,
fp,
start_byte=download_start_byte,
end_byte=end_byte,
compressed_encoding=compressed_encoding,
generation=src_url.generation,
object_size=src_obj_metadata.size,
download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
provider=src_url.scheme,
serialization_data=serialization_data,
digesters=digesters,
progress_callback=progress_callback,
decryption_tuple=CryptoKeyWrapperFromKey(decryption_key))
except ResumableDownloadException as e:
logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
e.reason, download_name)
raise
finally:
if fp:
fp.close()
bytes_transferred = end_byte - download_start_byte + 1
return bytes_transferred, server_encoding