def _DoParallelCompositeUpload()

in gslib/utils/copy_helper.py [0:0]


def _DoParallelCompositeUpload(fp,
                               src_url,
                               dst_url,
                               dst_obj_metadata,
                               canned_acl,
                               file_size,
                               preconditions,
                               gsutil_api,
                               command_obj,
                               copy_exception_handler,
                               logger,
                               gzip_encoded=False):
  """Uploads a local file to a cloud object using parallel composite upload.

  The file is partitioned into parts, and then the parts are uploaded in
  parallel, composed to form the original destination object, and deleted.

  Args:
    fp: The file object to be uploaded.
    src_url: FileUrl representing the local file.
    dst_url: CloudUrl representing the destination file.
    dst_obj_metadata: apitools Object describing the destination object.
    canned_acl: The canned acl to apply to the object, if any.
    file_size: The size of the source file in bytes.
    preconditions: Cloud API Preconditions for the final object.
    gsutil_api: gsutil Cloud API instance to use.
    command_obj: Command object (for calling Apply).
    copy_exception_handler: Copy exception handler (for use in Apply).
    logger: logging.Logger for outputting log messages.
    gzip_encoded: Whether to use gzip transport encoding for the upload.

  Returns:
    Elapsed upload time, uploaded Object with generation, crc32c, and size
    fields populated.
  """
  start_time = time.time()
  dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
  api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)

  encryption_keywrapper = GetEncryptionKeyWrapper(config)
  encryption_key_sha256 = (encryption_keywrapper.crypto_key_sha256
                           if encryption_keywrapper else None)

  # Determine which components, if any, have already been successfully
  # uploaded.
  tracker_file_name = GetTrackerFilePath(dst_url,
                                         TrackerFileType.PARALLEL_UPLOAD,
                                         api_selector, src_url)

  (existing_enc_key_sha256, existing_prefix,
   existing_components) = (ReadParallelUploadTrackerFile(
       tracker_file_name, logger))

  # Ensure that the tracker data is still valid (encryption keys match) and
  # perform any necessary cleanup.
  (existing_prefix, existing_components) = ValidateParallelCompositeTrackerData(
      tracker_file_name, existing_enc_key_sha256, existing_prefix,
      existing_components, encryption_key_sha256, dst_bucket_url, command_obj,
      logger, _DeleteTempComponentObjectFn, _RmExceptionHandler)

  encryption_key_sha256 = (encryption_key_sha256.decode('ascii')
                           if encryption_key_sha256 is not None else None)
  random_prefix = (existing_prefix if existing_prefix is not None else
                   GenerateComponentObjectPrefix(
                       encryption_key_sha256=encryption_key_sha256))

  # Create (or overwrite) the tracker file for the upload.
  WriteParallelUploadTrackerFile(tracker_file_name,
                                 random_prefix,
                                 existing_components,
                                 encryption_key_sha256=encryption_key_sha256)

  # Protect the tracker file within calls to Apply.
  tracker_file_lock = parallelism_framework_util.CreateLock()
  # Dict to track component info so we may align FileMessage values
  # before and after the operation.
  components_info = {}
  # Get the set of all components that should be uploaded.
  dst_args = _PartitionFile(canned_acl,
                            dst_obj_metadata.contentType,
                            dst_bucket_url,
                            file_size,
                            fp,
                            random_prefix,
                            src_url,
                            dst_obj_metadata.storageClass,
                            tracker_file_name,
                            tracker_file_lock,
                            encryption_key_sha256=encryption_key_sha256,
                            gzip_encoded=gzip_encoded)

  (components_to_upload, existing_components,
   existing_objects_to_delete) = (FilterExistingComponents(
       dst_args, existing_components, dst_bucket_url, gsutil_api))

  # Assign a start message to each different component type
  for component in components_to_upload:
    components_info[component.dst_url.url_string] = (
        FileMessage.COMPONENT_TO_UPLOAD, component.file_length)
    PutToQueueWithTimeout(
        gsutil_api.status_queue,
        FileMessage(component.src_url,
                    component.dst_url,
                    time.time(),
                    size=component.file_length,
                    finished=False,
                    component_num=_GetComponentNumber(component.dst_url),
                    message_type=FileMessage.COMPONENT_TO_UPLOAD))

  for component in existing_components:
    component_str = component[0].versionless_url_string
    components_info[component_str] = (FileMessage.EXISTING_COMPONENT,
                                      component[1])
    PutToQueueWithTimeout(
        gsutil_api.status_queue,
        FileMessage(src_url,
                    component[0],
                    time.time(),
                    finished=False,
                    size=component[1],
                    component_num=_GetComponentNumber(component[0]),
                    message_type=FileMessage.EXISTING_COMPONENT))

  for component in existing_objects_to_delete:
    PutToQueueWithTimeout(
        gsutil_api.status_queue,
        FileMessage(src_url,
                    component,
                    time.time(),
                    finished=False,
                    message_type=FileMessage.EXISTING_OBJECT_TO_DELETE))
  # In parallel, copy all of the file parts that haven't already been
  # uploaded to temporary objects.
  cp_results = command_obj.Apply(
      _PerformParallelUploadFileToObject,
      components_to_upload,
      copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
      arg_checker=gslib.command.DummyArgChecker,
      parallel_operations_override=command_obj.ParallelOverrideReason.SLICE,
      should_return_results=True)
  uploaded_components = []
  for cp_result in cp_results:
    uploaded_components.append(cp_result[2])
  components = uploaded_components + [i[0] for i in existing_components]

  if len(components) == len(dst_args):
    # Only try to compose if all of the components were uploaded successfully.
    # Sort the components so that they will be composed in the correct order.
    components = sorted(components, key=_GetComponentNumber)

    request_components = []
    for component_url in components:
      src_obj_metadata = (
          apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
              name=component_url.object_name))
      if component_url.HasGeneration():
        src_obj_metadata.generation = long(component_url.generation)
      request_components.append(src_obj_metadata)

    composed_object = gsutil_api.ComposeObject(
        request_components,
        dst_obj_metadata,
        preconditions=preconditions,
        provider=dst_url.scheme,
        fields=['crc32c', 'generation', 'size'],
        encryption_tuple=encryption_keywrapper)

    try:
      # Make sure only to delete things that we know were successfully
      # uploaded (as opposed to all of the objects that we attempted to
      # create) so that we don't delete any preexisting objects, except for
      # those that were uploaded by a previous, failed run and have since
      # changed (but still have an old generation lying around).
      objects_to_delete = components + existing_objects_to_delete
      command_obj.Apply(
          _DeleteTempComponentObjectFn,
          objects_to_delete,
          _RmExceptionHandler,
          arg_checker=gslib.command.DummyArgChecker,
          parallel_operations_override=command_obj.ParallelOverrideReason.SLICE)
      # Assign an end message to each different component type
      for component in components:
        component_str = component.versionless_url_string
        try:
          PutToQueueWithTimeout(
              gsutil_api.status_queue,
              FileMessage(src_url,
                          component,
                          time.time(),
                          finished=True,
                          component_num=_GetComponentNumber(component),
                          size=components_info[component_str][1],
                          message_type=components_info[component_str][0]))
        except:  # pylint: disable=bare-except
          PutToQueueWithTimeout(
              gsutil_api.status_queue,
              FileMessage(src_url, component, time.time(), finished=True))

      for component in existing_objects_to_delete:
        PutToQueueWithTimeout(
            gsutil_api.status_queue,
            FileMessage(src_url,
                        component,
                        time.time(),
                        finished=True,
                        message_type=FileMessage.EXISTING_OBJECT_TO_DELETE))
    except Exception:  # pylint: disable=broad-except
      # If some of the delete calls fail, don't cause the whole command to
      # fail. The copy was successful iff the compose call succeeded, so
      # reduce this to a warning.
      logger.warn(
          'Failed to delete some of the following temporary objects:\n' +
          '\n'.join(dst_args.keys()))
    finally:
      with tracker_file_lock:
        DeleteTrackerFile(tracker_file_name)
  else:
    # Some of the components failed to upload. In this case, we want to exit
    # without deleting the objects.
    raise CommandException(
        'Some temporary components were not uploaded successfully. '
        'Please retry this upload.')

  elapsed_time = time.time() - start_time
  return elapsed_time, composed_object