in gslib/utils/copy_helper.py [0:0]
def _UploadFileToObjectResumable(src_url,
src_obj_filestream,
src_obj_size,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
logger,
is_component=False,
gzip_encoded=False):
"""Uploads the file using a resumable strategy.
Args:
src_url: Source FileUrl to upload. Must not be a stream.
src_obj_filestream: File pointer to uploadable bytes.
src_obj_size (int or None): Size of the source object.
dst_url: Destination StorageUrl for the upload.
dst_obj_metadata: Metadata for the target object.
preconditions: Preconditions for the upload, if any.
gsutil_api: gsutil Cloud API instance to use for the upload.
logger: for outputting log messages.
is_component: indicates whether this is a single component or whole file.
gzip_encoded: Whether to use gzip transport encoding for the upload.
Returns:
Elapsed upload time, uploaded Object with generation, md5, and size fields
populated.
"""
tracker_file_name = GetTrackerFilePath(
dst_url, TrackerFileType.UPLOAD,
gsutil_api.GetApiSelector(provider=dst_url.scheme))
encryption_keywrapper = GetEncryptionKeyWrapper(config)
encryption_key_sha256 = (
encryption_keywrapper.crypto_key_sha256.decode('ascii')
if encryption_keywrapper and encryption_keywrapper.crypto_key_sha256 else
None)
def _UploadTrackerCallback(serialization_data):
"""Creates a new tracker file for starting an upload from scratch.
This function is called by the gsutil Cloud API implementation and the
the serialization data is implementation-specific.
Args:
serialization_data: Serialization data used in resuming the upload.
"""
data = {
ENCRYPTION_UPLOAD_TRACKER_ENTRY: encryption_key_sha256,
SERIALIZATION_UPLOAD_TRACKER_ENTRY: str(serialization_data)
}
WriteJsonDataToTrackerFile(tracker_file_name, data)
# This contains the upload URL, which will uniquely identify the
# destination object.
tracker_data = GetUploadTrackerData(
tracker_file_name, logger, encryption_key_sha256=encryption_key_sha256)
if tracker_data:
logger.info('Resuming upload for %s', src_url.url_string)
retryable = True
component_num = _GetComponentNumber(dst_url) if is_component else None
progress_callback = FileProgressCallbackHandler(
gsutil_api.status_queue,
src_url=src_url,
component_num=component_num,
dst_url=dst_url,
operation_name='Uploading').call
if global_copy_helper_opts.test_callback_file:
with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
progress_callback = pickle.loads(test_fp.read()).call
start_time = time.time()
num_startover_attempts = 0
# This loop causes us to retry when the resumable upload failed in a way that
# requires starting over with a new upload ID. Retries within a single upload
# ID within the current process are handled in
# gsutil_api.UploadObjectResumable, and retries within a single upload ID
# spanning processes happens if an exception occurs not caught below (which
# will leave the tracker file in place, and cause the upload ID to be reused
# the next time the user runs gsutil and attempts the same upload).
while retryable:
try:
uploaded_object = gsutil_api.UploadObjectResumable(
src_obj_filestream,
object_metadata=dst_obj_metadata,
canned_acl=global_copy_helper_opts.canned_acl,
preconditions=preconditions,
provider=dst_url.scheme,
size=src_obj_size,
serialization_data=tracker_data,
encryption_tuple=encryption_keywrapper,
fields=UPLOAD_RETURN_FIELDS,
tracker_callback=_UploadTrackerCallback,
progress_callback=progress_callback,
gzip_encoded=gzip_encoded)
retryable = False
except ResumableUploadStartOverException as e:
logger.info('Caught ResumableUploadStartOverException for upload of %s.' %
src_url.url_string)
# This can happen, for example, if the server sends a 410 response code.
# In that case the current resumable upload ID can't be reused, so delete
# the tracker file and try again up to max retries.
num_startover_attempts += 1
retryable = (num_startover_attempts < GetNumRetries())
if not retryable:
raise
# If the server sends a 404 response code, then the upload should only
# be restarted if it was the object (and not the bucket) that was missing.
try:
logger.info('Checking that bucket %s exists before retrying upload...' %
dst_obj_metadata.bucket)
gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
except AccessDeniedException:
# Proceed with deleting the tracker file in the event that the bucket
# exists, but the user does not have permission to view its metadata.
pass
except NotFoundException:
raise
finally:
DeleteTrackerFile(tracker_file_name)
logger.info('Deleted tracker file %s for resumable upload of %s before '
'retrying.' % (tracker_file_name, src_url.url_string))
logger.info(
'Restarting upload of %s from scratch (retry #%d) after exception '
'indicating we need to start over with a new resumable upload ID: %s'
% (src_url.url_string, num_startover_attempts, e))
tracker_data = None
src_obj_filestream.seek(0)
# Reset the progress callback handler.
component_num = _GetComponentNumber(dst_url) if is_component else None
progress_callback = FileProgressCallbackHandler(
gsutil_api.status_queue,
src_url=src_url,
component_num=component_num,
dst_url=dst_url,
operation_name='Uploading').call
# Report the retryable error to the global status queue.
PutToQueueWithTimeout(
gsutil_api.status_queue,
RetryableErrorMessage(e,
time.time(),
num_retries=num_startover_attempts))
time.sleep(
min(random.random() * (2**num_startover_attempts),
GetMaxRetryDelay()))
except ResumableUploadAbortException:
retryable = False
raise
finally:
if not retryable:
DeleteTrackerFile(tracker_file_name)
end_time = time.time()
elapsed_time = end_time - start_time
return (elapsed_time, uploaded_object)