def _UploadFileToObjectResumable()

in gslib/utils/copy_helper.py [0:0]


def _UploadFileToObjectResumable(src_url,
                                 src_obj_filestream,
                                 src_obj_size,
                                 dst_url,
                                 dst_obj_metadata,
                                 preconditions,
                                 gsutil_api,
                                 logger,
                                 is_component=False,
                                 gzip_encoded=False):
  """Uploads the file using a resumable strategy.

  Args:
    src_url: Source FileUrl to upload.  Must not be a stream.
    src_obj_filestream: File pointer to uploadable bytes.
    src_obj_size (int or None): Size of the source object.
    dst_url: Destination StorageUrl for the upload.
    dst_obj_metadata: Metadata for the target object.
    preconditions: Preconditions for the upload, if any.
    gsutil_api: gsutil Cloud API instance to use for the upload.
    logger: for outputting log messages.
    is_component: indicates whether this is a single component or whole file.
    gzip_encoded: Whether to use gzip transport encoding for the upload.

  Returns:
    Elapsed upload time, uploaded Object with generation, md5, and size fields
    populated.
  """
  tracker_file_name = GetTrackerFilePath(
      dst_url, TrackerFileType.UPLOAD,
      gsutil_api.GetApiSelector(provider=dst_url.scheme))

  encryption_keywrapper = GetEncryptionKeyWrapper(config)
  encryption_key_sha256 = (
      encryption_keywrapper.crypto_key_sha256.decode('ascii')
      if encryption_keywrapper and encryption_keywrapper.crypto_key_sha256 else
      None)

  def _UploadTrackerCallback(serialization_data):
    """Creates a new tracker file for starting an upload from scratch.

    This function is called by the gsutil Cloud API implementation and the
    the serialization data is implementation-specific.

    Args:
      serialization_data: Serialization data used in resuming the upload.
    """
    data = {
        ENCRYPTION_UPLOAD_TRACKER_ENTRY: encryption_key_sha256,
        SERIALIZATION_UPLOAD_TRACKER_ENTRY: str(serialization_data)
    }
    WriteJsonDataToTrackerFile(tracker_file_name, data)

  # This contains the upload URL, which will uniquely identify the
  # destination object.
  tracker_data = GetUploadTrackerData(
      tracker_file_name, logger, encryption_key_sha256=encryption_key_sha256)
  if tracker_data:
    logger.info('Resuming upload for %s', src_url.url_string)

  retryable = True

  component_num = _GetComponentNumber(dst_url) if is_component else None
  progress_callback = FileProgressCallbackHandler(
      gsutil_api.status_queue,
      src_url=src_url,
      component_num=component_num,
      dst_url=dst_url,
      operation_name='Uploading').call

  if global_copy_helper_opts.test_callback_file:
    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
      progress_callback = pickle.loads(test_fp.read()).call

  start_time = time.time()
  num_startover_attempts = 0
  # This loop causes us to retry when the resumable upload failed in a way that
  # requires starting over with a new upload ID. Retries within a single upload
  # ID within the current process are handled in
  # gsutil_api.UploadObjectResumable, and retries within a single upload ID
  # spanning processes happens if an exception occurs not caught below (which
  # will leave the tracker file in place, and cause the upload ID to be reused
  # the next time the user runs gsutil and attempts the same upload).
  while retryable:
    try:
      uploaded_object = gsutil_api.UploadObjectResumable(
          src_obj_filestream,
          object_metadata=dst_obj_metadata,
          canned_acl=global_copy_helper_opts.canned_acl,
          preconditions=preconditions,
          provider=dst_url.scheme,
          size=src_obj_size,
          serialization_data=tracker_data,
          encryption_tuple=encryption_keywrapper,
          fields=UPLOAD_RETURN_FIELDS,
          tracker_callback=_UploadTrackerCallback,
          progress_callback=progress_callback,
          gzip_encoded=gzip_encoded)
      retryable = False
    except ResumableUploadStartOverException as e:
      logger.info('Caught ResumableUploadStartOverException for upload of %s.' %
                  src_url.url_string)
      # This can happen, for example, if the server sends a 410 response code.
      # In that case the current resumable upload ID can't be reused, so delete
      # the tracker file and try again up to max retries.
      num_startover_attempts += 1
      retryable = (num_startover_attempts < GetNumRetries())
      if not retryable:
        raise

      # If the server sends a 404 response code, then the upload should only
      # be restarted if it was the object (and not the bucket) that was missing.
      try:
        logger.info('Checking that bucket %s exists before retrying upload...' %
                    dst_obj_metadata.bucket)
        gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
      except AccessDeniedException:
        # Proceed with deleting the tracker file in the event that the bucket
        # exists, but the user does not have permission to view its metadata.
        pass
      except NotFoundException:
        raise
      finally:
        DeleteTrackerFile(tracker_file_name)
        logger.info('Deleted tracker file %s for resumable upload of %s before '
                    'retrying.' % (tracker_file_name, src_url.url_string))

      logger.info(
          'Restarting upload of %s from scratch (retry #%d) after exception '
          'indicating we need to start over with a new resumable upload ID: %s'
          % (src_url.url_string, num_startover_attempts, e))
      tracker_data = None
      src_obj_filestream.seek(0)
      # Reset the progress callback handler.
      component_num = _GetComponentNumber(dst_url) if is_component else None
      progress_callback = FileProgressCallbackHandler(
          gsutil_api.status_queue,
          src_url=src_url,
          component_num=component_num,
          dst_url=dst_url,
          operation_name='Uploading').call

      # Report the retryable error to the global status queue.
      PutToQueueWithTimeout(
          gsutil_api.status_queue,
          RetryableErrorMessage(e,
                                time.time(),
                                num_retries=num_startover_attempts))
      time.sleep(
          min(random.random() * (2**num_startover_attempts),
              GetMaxRetryDelay()))
    except ResumableUploadAbortException:
      retryable = False
      raise
    finally:
      if not retryable:
        DeleteTrackerFile(tracker_file_name)

  end_time = time.time()
  elapsed_time = end_time - start_time

  return (elapsed_time, uploaded_object)