in gslib/utils/copy_helper.py [0:0]
def _UploadFileToObject(src_url,
src_obj_filestream,
src_obj_size,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
logger,
command_obj,
copy_exception_handler,
gzip_exts=None,
allow_splitting=True,
is_component=False,
gzip_encoded=False):
"""Uploads a local file to an object.
Args:
src_url: Source FileUrl.
src_obj_filestream: Read stream of the source file to be read and closed.
src_obj_size (int or None): Size of the source file.
dst_url: Destination CloudUrl.
dst_obj_metadata: Metadata to be applied to the destination object.
preconditions: Preconditions to use for the copy.
gsutil_api: gsutil Cloud API to use for the copy.
logger: for outputting log messages.
command_obj: command object for use in Apply in parallel composite uploads.
copy_exception_handler: For handling copy exceptions during Apply.
gzip_exts: List of file extensions to gzip prior to upload, if any.
If gzip_exts is GZIP_ALL_FILES, gzip all files.
allow_splitting: Whether to allow the file to be split into component
pieces for an parallel composite upload.
is_component: indicates whether this is a single component or whole file.
gzip_encoded: Whether to use gzip transport encoding for the upload. Used
in conjunction with gzip_exts for selecting which files will be
encoded. Streaming files compressed is only supported on the JSON GCS
API.
Returns:
(elapsed_time, bytes_transferred, dst_url with generation,
md5 hash of destination) excluding overhead like initial GET.
Raises:
CommandException: if errors encountered.
"""
if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
content_language = config.get_value('GSUtil', 'content_language')
if content_language:
dst_obj_metadata.contentLanguage = content_language
upload_url = src_url
upload_stream = src_obj_filestream
upload_size = src_obj_size
zipped_file, gzip_encoded_file = _SelectUploadCompressionStrategy(
src_url.object_name, is_component, gzip_exts, gzip_encoded)
# The component's parent already printed this debug message.
if gzip_encoded_file and not is_component:
logger.debug('Using compressed transport encoding for %s.', src_url)
elif zipped_file:
upload_url, upload_stream, upload_size = _ApplyZippedUploadCompression(
src_url, src_obj_filestream, src_obj_size, logger)
dst_obj_metadata.contentEncoding = 'gzip'
# If we're sending an object with gzip encoding, it's possible it also
# has an incompressible content type. Google Cloud Storage will remove
# the top layer of compression when serving the object, which would cause
# the served content not to match the CRC32C/MD5 hashes stored and make
# integrity checking impossible. Therefore we set cache control to
# no-transform to ensure it is served in its original form. The caveat is
# that to read this object, other clients must then support
# accept-encoding:gzip.
if not dst_obj_metadata.cacheControl:
dst_obj_metadata.cacheControl = 'no-transform'
elif 'no-transform' not in dst_obj_metadata.cacheControl.lower():
dst_obj_metadata.cacheControl += ',no-transform'
if not is_component:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(upload_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_UPLOAD,
size=upload_size,
finished=False))
elapsed_time = None
uploaded_object = None
hash_algs = GetUploadHashAlgs()
digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
parallel_composite_upload = _ShouldDoParallelCompositeUpload(
logger,
allow_splitting,
upload_url,
dst_url,
src_obj_size,
gsutil_api,
canned_acl=global_copy_helper_opts.canned_acl)
non_resumable_upload = ((0 if upload_size is None else upload_size)
< ResumableThreshold() or src_url.IsStream() or
src_url.IsFifo())
if ((src_url.IsStream() or src_url.IsFifo()) and
gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
orig_stream = upload_stream
# Add limited seekable properties to the stream via buffering.
upload_stream = ResumableStreamingJsonUploadWrapper(
orig_stream, GetJsonResumableChunkSize())
if not parallel_composite_upload and len(hash_algs):
# Parallel composite uploads calculate hashes per-component in subsequent
# calls to this function, but the composition of the final object is a
# cloud-only operation.
wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
hash_algs, upload_url, logger)
else:
wrapped_filestream = upload_stream
def CallParallelCompositeUpload():
return _DoParallelCompositeUpload(upload_stream,
upload_url,
dst_url,
dst_obj_metadata,
global_copy_helper_opts.canned_acl,
upload_size,
preconditions,
gsutil_api,
command_obj,
copy_exception_handler,
logger,
gzip_encoded=gzip_encoded_file)
def CallNonResumableUpload():
return _UploadFileToObjectNonResumable(upload_url,
wrapped_filestream,
upload_size,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
gzip_encoded=gzip_encoded_file)
def CallResumableUpload():
return _UploadFileToObjectResumable(upload_url,
wrapped_filestream,
upload_size,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
logger,
is_component=is_component,
gzip_encoded=gzip_encoded_file)
if parallel_composite_upload:
delegate = CallParallelCompositeUpload
elif non_resumable_upload:
delegate = CallNonResumableUpload
else:
delegate = CallResumableUpload
elapsed_time, uploaded_object = _DelegateUploadFileToObject(
delegate, upload_url, upload_stream, zipped_file, gzip_encoded_file,
parallel_composite_upload, logger)
if not parallel_composite_upload:
try:
digests = _CreateDigestsFromDigesters(digesters)
_CheckHashes(logger,
dst_url,
uploaded_object,
src_url.object_name,
digests,
is_upload=True)
except HashMismatchException:
if _RENAME_ON_HASH_MISMATCH:
corrupted_obj_metadata = apitools_messages.Object(
name=dst_obj_metadata.name,
bucket=dst_obj_metadata.bucket,
etag=uploaded_object.etag)
dst_obj_metadata.name = (dst_url.object_name +
_RENAME_ON_HASH_MISMATCH_SUFFIX)
gsutil_api.CopyObject(corrupted_obj_metadata,
dst_obj_metadata,
provider=dst_url.scheme)
# If the digest doesn't match, delete the object.
gsutil_api.DeleteObject(dst_url.bucket_name,
dst_url.object_name,
generation=uploaded_object.generation,
provider=dst_url.scheme)
raise
result_url = dst_url.Clone()
result_url.generation = uploaded_object.generation
result_url.generation = GenerationFromUrlAndString(result_url,
uploaded_object.generation)
if not is_component:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(upload_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_UPLOAD,
size=upload_size,
finished=True))
return (elapsed_time, uploaded_object.size, result_url,
uploaded_object.md5Hash)