def _PerformResumableUpload()

in gslib/gcs_json_api.py [0:0]


  def _PerformResumableUpload(self, upload_stream, authorized_upload_http,
                              content_type, size, serialization_data,
                              apitools_strategy, apitools_request,
                              global_params, bytes_uploaded_container,
                              tracker_callback, addl_headers, progress_callback,
                              gzip_encoded):
    try:
      if serialization_data:
        # Resuming an existing upload.
        apitools_upload = apitools_transfer.Upload.FromData(
            upload_stream,
            serialization_data,
            self.api_client.http,
            num_retries=self.num_retries,
            gzip_encoded=gzip_encoded,
            client=self.api_client)
        apitools_upload.chunksize = GetJsonResumableChunkSize()
        apitools_upload.bytes_http = authorized_upload_http
      else:
        # New resumable upload.
        apitools_upload = apitools_transfer.Upload(
            upload_stream,
            content_type,
            total_size=size,
            chunksize=GetJsonResumableChunkSize(),
            auto_transfer=False,
            num_retries=self.num_retries,
            gzip_encoded=gzip_encoded)
        apitools_upload.strategy = apitools_strategy
        apitools_upload.bytes_http = authorized_upload_http
        with self._ApitoolsRequestHeaders(addl_headers):
          self.api_client.objects.Insert(apitools_request,
                                         upload=apitools_upload,
                                         global_params=global_params)
      # Disable retries in apitools. We will handle them explicitly here.
      apitools_upload.retry_func = LogAndHandleRetries(
          is_data_transfer=True, status_queue=self.status_queue)

      # Disable apitools' default print callbacks.
      def _NoOpCallback(unused_response, unused_upload_object):
        pass

      # If we're resuming an upload, apitools has at this point received
      # from the server how many bytes it already has. Update our
      # callback class with this information.
      bytes_uploaded_container.bytes_transferred = apitools_upload.progress

      if tracker_callback:
        tracker_callback(json.dumps(apitools_upload.serialization_data))

      retries = 0
      last_progress_byte = apitools_upload.progress
      while retries <= self.num_retries:
        try:
          # TODO: On retry, this will seek to the bytes that the server has,
          # causing the hash to be recalculated. Make HashingFileUploadWrapper
          # save a digest according to json_resumable_chunk_size.
          if not gzip_encoded and size and not JsonResumableChunkSizeDefined():
            # If size is known and the request doesn't need to be compressed,
            # we can send it all in one request and avoid making a
            # round-trip per chunk. Compression is not supported for
            # non-chunked streaming uploads because supporting resumability
            # for that feature results in degraded upload performance and
            # adds significant complexity to the implementation.
            http_response = apitools_upload.StreamMedia(
                callback=_NoOpCallback,
                finish_callback=_NoOpCallback,
                additional_headers=addl_headers)
          else:
            # Otherwise it's a streaming request and we need to ensure that we
            # send the bytes in chunks so that we can guarantee that we never
            # need to seek backwards more than our buffer (and also that the
            # chunks are aligned to 256KB).
            http_response = apitools_upload.StreamInChunks(
                callback=_NoOpCallback,
                finish_callback=_NoOpCallback,
                additional_headers=addl_headers)
          processed_response = self.api_client.objects.ProcessHttpResponse(
              self.api_client.objects.GetMethodConfig('Insert'), http_response)
          if size is None and progress_callback:
            # Make final progress callback; total size should now be known.
            # This works around the fact the send function counts header bytes.
            # However, this will make the progress appear to go slightly
            # backwards at the end.
            progress_callback(apitools_upload.total_size,
                              apitools_upload.total_size)
          return processed_response
        except HTTP_TRANSFER_EXCEPTIONS as e:
          self._ValidateHttpAccessTokenRefreshError(e)
          apitools_http_wrapper.RebuildHttpConnections(
              apitools_upload.bytes_http)
          while retries <= self.num_retries:
            try:
              # TODO: Simulate the refresh case in tests. Right now, our
              # mocks are not complex enough to simulate a failure.
              apitools_upload.RefreshResumableUploadState()
              start_byte = apitools_upload.progress
              bytes_uploaded_container.bytes_transferred = start_byte
              break
            except HTTP_TRANSFER_EXCEPTIONS as e2:
              self._ValidateHttpAccessTokenRefreshError(e2)
              apitools_http_wrapper.RebuildHttpConnections(
                  apitools_upload.bytes_http)
              retries += 1
              if retries > self.num_retries:
                raise ResumableUploadException(
                    'Transfer failed after %d retries. Final exception: %s' %
                    (self.num_retries, e2))
              time.sleep(
                  CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
          if start_byte > last_progress_byte:
            # We've made progress, so allow a fresh set of retries.
            last_progress_byte = start_byte
            retries = 0
          else:
            retries += 1
            if retries > self.num_retries:
              raise ResumableUploadException(
                  'Transfer failed after %d retries. Final exception: %s' %
                  (self.num_retries, GetPrintableExceptionString(e)))
            time.sleep(
                CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
          if self.logger.isEnabledFor(logging.DEBUG):
            self.logger.debug(
                'Retrying upload from byte %s after exception: %s. Trace: %s',
                start_byte, GetPrintableExceptionString(e),
                traceback.format_exc())
    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
      resumable_ex = self._TranslateApitoolsResumableUploadException(e)
      if resumable_ex:
        raise resumable_ex
      else:
        raise