in gslib/utils/copy_helper.py [0:0]
def _DoParallelCompositeUpload(fp,
src_url,
dst_url,
dst_obj_metadata,
canned_acl,
file_size,
preconditions,
gsutil_api,
command_obj,
copy_exception_handler,
logger,
gzip_encoded=False):
"""Uploads a local file to a cloud object using parallel composite upload.
The file is partitioned into parts, and then the parts are uploaded in
parallel, composed to form the original destination object, and deleted.
Args:
fp: The file object to be uploaded.
src_url: FileUrl representing the local file.
dst_url: CloudUrl representing the destination file.
dst_obj_metadata: apitools Object describing the destination object.
canned_acl: The canned acl to apply to the object, if any.
file_size: The size of the source file in bytes.
preconditions: Cloud API Preconditions for the final object.
gsutil_api: gsutil Cloud API instance to use.
command_obj: Command object (for calling Apply).
copy_exception_handler: Copy exception handler (for use in Apply).
logger: logging.Logger for outputting log messages.
gzip_encoded: Whether to use gzip transport encoding for the upload.
Returns:
Elapsed upload time, uploaded Object with generation, crc32c, and size
fields populated.
"""
start_time = time.time()
dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
encryption_keywrapper = GetEncryptionKeyWrapper(config)
encryption_key_sha256 = (encryption_keywrapper.crypto_key_sha256
if encryption_keywrapper else None)
# Determine which components, if any, have already been successfully
# uploaded.
tracker_file_name = GetTrackerFilePath(dst_url,
TrackerFileType.PARALLEL_UPLOAD,
api_selector, src_url)
(existing_enc_key_sha256, existing_prefix,
existing_components) = (ReadParallelUploadTrackerFile(
tracker_file_name, logger))
# Ensure that the tracker data is still valid (encryption keys match) and
# perform any necessary cleanup.
(existing_prefix, existing_components) = ValidateParallelCompositeTrackerData(
tracker_file_name, existing_enc_key_sha256, existing_prefix,
existing_components, encryption_key_sha256, dst_bucket_url, command_obj,
logger, _DeleteTempComponentObjectFn, _RmExceptionHandler)
encryption_key_sha256 = (encryption_key_sha256.decode('ascii')
if encryption_key_sha256 is not None else None)
random_prefix = (existing_prefix if existing_prefix is not None else
GenerateComponentObjectPrefix(
encryption_key_sha256=encryption_key_sha256))
# Create (or overwrite) the tracker file for the upload.
WriteParallelUploadTrackerFile(tracker_file_name,
random_prefix,
existing_components,
encryption_key_sha256=encryption_key_sha256)
# Protect the tracker file within calls to Apply.
tracker_file_lock = parallelism_framework_util.CreateLock()
# Dict to track component info so we may align FileMessage values
# before and after the operation.
components_info = {}
# Get the set of all components that should be uploaded.
dst_args = _PartitionFile(canned_acl,
dst_obj_metadata.contentType,
dst_bucket_url,
file_size,
fp,
random_prefix,
src_url,
dst_obj_metadata.storageClass,
tracker_file_name,
tracker_file_lock,
encryption_key_sha256=encryption_key_sha256,
gzip_encoded=gzip_encoded)
(components_to_upload, existing_components,
existing_objects_to_delete) = (FilterExistingComponents(
dst_args, existing_components, dst_bucket_url, gsutil_api))
# Assign a start message to each different component type
for component in components_to_upload:
components_info[component.dst_url.url_string] = (
FileMessage.COMPONENT_TO_UPLOAD, component.file_length)
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(component.src_url,
component.dst_url,
time.time(),
size=component.file_length,
finished=False,
component_num=_GetComponentNumber(component.dst_url),
message_type=FileMessage.COMPONENT_TO_UPLOAD))
for component in existing_components:
component_str = component[0].versionless_url_string
components_info[component_str] = (FileMessage.EXISTING_COMPONENT,
component[1])
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
component[0],
time.time(),
finished=False,
size=component[1],
component_num=_GetComponentNumber(component[0]),
message_type=FileMessage.EXISTING_COMPONENT))
for component in existing_objects_to_delete:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
component,
time.time(),
finished=False,
message_type=FileMessage.EXISTING_OBJECT_TO_DELETE))
# In parallel, copy all of the file parts that haven't already been
# uploaded to temporary objects.
cp_results = command_obj.Apply(
_PerformParallelUploadFileToObject,
components_to_upload,
copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
arg_checker=gslib.command.DummyArgChecker,
parallel_operations_override=command_obj.ParallelOverrideReason.SLICE,
should_return_results=True)
uploaded_components = []
for cp_result in cp_results:
uploaded_components.append(cp_result[2])
components = uploaded_components + [i[0] for i in existing_components]
if len(components) == len(dst_args):
# Only try to compose if all of the components were uploaded successfully.
# Sort the components so that they will be composed in the correct order.
components = sorted(components, key=_GetComponentNumber)
request_components = []
for component_url in components:
src_obj_metadata = (
apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
name=component_url.object_name))
if component_url.HasGeneration():
src_obj_metadata.generation = long(component_url.generation)
request_components.append(src_obj_metadata)
composed_object = gsutil_api.ComposeObject(
request_components,
dst_obj_metadata,
preconditions=preconditions,
provider=dst_url.scheme,
fields=['crc32c', 'generation', 'size'],
encryption_tuple=encryption_keywrapper)
try:
# Make sure only to delete things that we know were successfully
# uploaded (as opposed to all of the objects that we attempted to
# create) so that we don't delete any preexisting objects, except for
# those that were uploaded by a previous, failed run and have since
# changed (but still have an old generation lying around).
objects_to_delete = components + existing_objects_to_delete
command_obj.Apply(
_DeleteTempComponentObjectFn,
objects_to_delete,
_RmExceptionHandler,
arg_checker=gslib.command.DummyArgChecker,
parallel_operations_override=command_obj.ParallelOverrideReason.SLICE)
# Assign an end message to each different component type
for component in components:
component_str = component.versionless_url_string
try:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
component,
time.time(),
finished=True,
component_num=_GetComponentNumber(component),
size=components_info[component_str][1],
message_type=components_info[component_str][0]))
except: # pylint: disable=bare-except
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url, component, time.time(), finished=True))
for component in existing_objects_to_delete:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
component,
time.time(),
finished=True,
message_type=FileMessage.EXISTING_OBJECT_TO_DELETE))
except Exception: # pylint: disable=broad-except
# If some of the delete calls fail, don't cause the whole command to
# fail. The copy was successful iff the compose call succeeded, so
# reduce this to a warning.
logger.warn(
'Failed to delete some of the following temporary objects:\n' +
'\n'.join(dst_args.keys()))
finally:
with tracker_file_lock:
DeleteTrackerFile(tracker_file_name)
else:
# Some of the components failed to upload. In this case, we want to exit
# without deleting the objects.
raise CommandException(
'Some temporary components were not uploaded successfully. '
'Please retry this upload.')
elapsed_time = time.time() - start_time
return elapsed_time, composed_object