in gslib/gcs_json_api.py [0:0]
def _PerformResumableUpload(self, upload_stream, authorized_upload_http,
content_type, size, serialization_data,
apitools_strategy, apitools_request,
global_params, bytes_uploaded_container,
tracker_callback, addl_headers, progress_callback,
gzip_encoded):
try:
if serialization_data:
# Resuming an existing upload.
apitools_upload = apitools_transfer.Upload.FromData(
upload_stream,
serialization_data,
self.api_client.http,
num_retries=self.num_retries,
gzip_encoded=gzip_encoded,
client=self.api_client)
apitools_upload.chunksize = GetJsonResumableChunkSize()
apitools_upload.bytes_http = authorized_upload_http
else:
# New resumable upload.
apitools_upload = apitools_transfer.Upload(
upload_stream,
content_type,
total_size=size,
chunksize=GetJsonResumableChunkSize(),
auto_transfer=False,
num_retries=self.num_retries,
gzip_encoded=gzip_encoded)
apitools_upload.strategy = apitools_strategy
apitools_upload.bytes_http = authorized_upload_http
with self._ApitoolsRequestHeaders(addl_headers):
self.api_client.objects.Insert(apitools_request,
upload=apitools_upload,
global_params=global_params)
# Disable retries in apitools. We will handle them explicitly here.
apitools_upload.retry_func = LogAndHandleRetries(
is_data_transfer=True, status_queue=self.status_queue)
# Disable apitools' default print callbacks.
def _NoOpCallback(unused_response, unused_upload_object):
pass
# If we're resuming an upload, apitools has at this point received
# from the server how many bytes it already has. Update our
# callback class with this information.
bytes_uploaded_container.bytes_transferred = apitools_upload.progress
if tracker_callback:
tracker_callback(json.dumps(apitools_upload.serialization_data))
retries = 0
last_progress_byte = apitools_upload.progress
while retries <= self.num_retries:
try:
# TODO: On retry, this will seek to the bytes that the server has,
# causing the hash to be recalculated. Make HashingFileUploadWrapper
# save a digest according to json_resumable_chunk_size.
if not gzip_encoded and size and not JsonResumableChunkSizeDefined():
# If size is known and the request doesn't need to be compressed,
# we can send it all in one request and avoid making a
# round-trip per chunk. Compression is not supported for
# non-chunked streaming uploads because supporting resumability
# for that feature results in degraded upload performance and
# adds significant complexity to the implementation.
http_response = apitools_upload.StreamMedia(
callback=_NoOpCallback,
finish_callback=_NoOpCallback,
additional_headers=addl_headers)
else:
# Otherwise it's a streaming request and we need to ensure that we
# send the bytes in chunks so that we can guarantee that we never
# need to seek backwards more than our buffer (and also that the
# chunks are aligned to 256KB).
http_response = apitools_upload.StreamInChunks(
callback=_NoOpCallback,
finish_callback=_NoOpCallback,
additional_headers=addl_headers)
processed_response = self.api_client.objects.ProcessHttpResponse(
self.api_client.objects.GetMethodConfig('Insert'), http_response)
if size is None and progress_callback:
# Make final progress callback; total size should now be known.
# This works around the fact the send function counts header bytes.
# However, this will make the progress appear to go slightly
# backwards at the end.
progress_callback(apitools_upload.total_size,
apitools_upload.total_size)
return processed_response
except HTTP_TRANSFER_EXCEPTIONS as e:
self._ValidateHttpAccessTokenRefreshError(e)
apitools_http_wrapper.RebuildHttpConnections(
apitools_upload.bytes_http)
while retries <= self.num_retries:
try:
# TODO: Simulate the refresh case in tests. Right now, our
# mocks are not complex enough to simulate a failure.
apitools_upload.RefreshResumableUploadState()
start_byte = apitools_upload.progress
bytes_uploaded_container.bytes_transferred = start_byte
break
except HTTP_TRANSFER_EXCEPTIONS as e2:
self._ValidateHttpAccessTokenRefreshError(e2)
apitools_http_wrapper.RebuildHttpConnections(
apitools_upload.bytes_http)
retries += 1
if retries > self.num_retries:
raise ResumableUploadException(
'Transfer failed after %d retries. Final exception: %s' %
(self.num_retries, e2))
time.sleep(
CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
if start_byte > last_progress_byte:
# We've made progress, so allow a fresh set of retries.
last_progress_byte = start_byte
retries = 0
else:
retries += 1
if retries > self.num_retries:
raise ResumableUploadException(
'Transfer failed after %d retries. Final exception: %s' %
(self.num_retries, GetPrintableExceptionString(e)))
time.sleep(
CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
'Retrying upload from byte %s after exception: %s. Trace: %s',
start_byte, GetPrintableExceptionString(e),
traceback.format_exc())
except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
resumable_ex = self._TranslateApitoolsResumableUploadException(e)
if resumable_ex:
raise resumable_ex
else:
raise