in gslib/utils/copy_helper.py [0:0]
def PerformCopy(logger,
src_url,
dst_url,
gsutil_api,
command_obj,
copy_exception_handler,
src_obj_metadata=None,
allow_splitting=True,
headers=None,
manifest=None,
gzip_exts=None,
is_rsync=False,
preserve_posix=False,
gzip_encoded=False,
use_stet=False):
"""Performs copy from src_url to dst_url, handling various special cases.
Args:
logger: for outputting log messages.
src_url: Source StorageUrl.
dst_url: Destination StorageUrl.
gsutil_api: gsutil Cloud API instance to use for the copy.
command_obj: command object for use in Apply in parallel composite uploads
and sliced object downloads.
copy_exception_handler: for handling copy exceptions during Apply.
src_obj_metadata: If source URL is a cloud object, source object metadata
with all necessary fields (per GetSourceFieldsNeededForCopy).
Required for cloud source URLs. If source URL is a file, an
apitools Object that may contain file size, or None.
allow_splitting: Whether to allow the file to be split into component
pieces for an parallel composite upload or download.
headers: optional headers to use for the copy operation.
manifest: optional manifest for tracking copy operations.
gzip_exts: List of file extensions to gzip, if any.
If gzip_exts is GZIP_ALL_FILES, gzip all files.
is_rsync: Whether or not the caller is the rsync command.
preserve_posix: Whether or not to preserve posix attributes.
gzip_encoded: Whether to use gzip transport encoding for the upload. Used
in conjunction with gzip_exts. Streaming files compressed is only
supported on the JSON GCS API.
use_stet: If True, will perform STET encryption or decryption using
the binary specified in the boto config or PATH.
Returns:
(elapsed_time, bytes_transferred, version-specific dst_url) excluding
overhead like initial GET.
Raises:
ItemExistsError: if no clobber flag is specified and the destination
object already exists.
SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
and the source is an unsupported type.
CommandException: if other errors encountered.
"""
# TODO: Remove elapsed_time as it is currently unused by all callers.
if headers:
dst_obj_headers = headers.copy()
else:
dst_obj_headers = {}
# Create a metadata instance for each destination object so metadata
# such as content-type can be applied per-object.
# Initialize metadata from any headers passed in via -h.
dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
preconditions = PreconditionsFromHeaders(dst_obj_headers)
else:
preconditions = Preconditions()
src_obj_filestream = None
decryption_key = None
copy_in_the_cloud = False
if src_url.IsCloudUrl():
if (dst_url.IsCloudUrl() and src_url.scheme == dst_url.scheme and
not global_copy_helper_opts.daisy_chain):
copy_in_the_cloud = True
if global_copy_helper_opts.perform_mv:
WarnIfMvEarlyDeletionChargeApplies(src_url, src_obj_metadata, logger)
MaybeSkipUnsupportedObject(src_url, src_obj_metadata)
decryption_key = GetDecryptionCSEK(src_url, src_obj_metadata)
src_obj_size = src_obj_metadata.size
dst_obj_metadata.contentType = src_obj_metadata.contentType
if global_copy_helper_opts.preserve_acl and dst_url.IsCloudUrl():
if src_url.scheme == 'gs' and not src_obj_metadata.acl:
raise CommandException(
'No OWNER permission found for object %s. OWNER permission is '
'required for preserving ACLs.' % src_url)
dst_obj_metadata.acl = src_obj_metadata.acl
# Special case for S3-to-S3 copy URLs using
# global_copy_helper_opts.preserve_acl.
# dst_url will be verified in _CopyObjToObjDaisyChainMode if it
# is not s3 (and thus differs from src_url).
if src_url.scheme == 's3':
acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
if acl_text:
AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
else: # src_url.IsFileUrl()
if use_stet:
source_stream_url = stet_util.encrypt_upload(src_url, dst_url, logger)
else:
source_stream_url = src_url
try:
src_obj_filestream = GetStreamFromFileUrl(source_stream_url)
except Exception as e: # pylint: disable=broad-except
message = 'Error opening file "%s": %s.' % (src_url, str(e))
if command_obj.continue_on_error:
command_obj.op_failure_count += 1
logger.error(message)
return
else:
raise CommandException(message)
if src_url.IsStream() or src_url.IsFifo():
src_obj_size = None
elif src_obj_metadata and src_obj_metadata.size and not use_stet:
# Iterator retrieved the file's size, no need to stat it again.
# Unless STET changed the file size.
src_obj_size = src_obj_metadata.size
else:
src_obj_size = os.path.getsize(source_stream_url.object_name)
if global_copy_helper_opts.use_manifest:
# Set the source size in the manifest.
manifest.Set(src_url.url_string, 'size', src_obj_size)
if (dst_url.scheme == 's3' and src_url != 's3' and
src_obj_size is not None and # Can't compare int to None in py3
src_obj_size > S3_MAX_UPLOAD_SIZE):
raise CommandException(
'"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
'objects greater than %s in size require multipart uploads, which '
'gsutil does not support.' %
(src_url, MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))
# On Windows, stdin is opened as text mode instead of binary which causes
# problems when piping a binary file, so this switches it to binary mode.
if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
if global_copy_helper_opts.no_clobber:
# There are two checks to prevent clobbering:
# 1) The first check is to see if the URL
# already exists at the destination and prevent the upload/download
# from happening. This is done by the exists() call.
# 2) The second check is only relevant if we are writing to gs. We can
# enforce that the server only writes the object if it doesn't exist
# by specifying the header below. This check only happens at the
# server after the complete file has been uploaded. We specify this
# header to prevent a race condition where a destination file may
# be created after the first check and before the file is fully
# uploaded.
# In order to save on unnecessary uploads/downloads we perform both
# checks. However, this may come at the cost of additional HTTP calls.
if preconditions.gen_match:
raise ArgumentException('Specifying x-goog-if-generation-match is '
'not supported with cp -n')
else:
preconditions.gen_match = 0
if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
raise ItemExistsError()
elif dst_url.IsCloudUrl():
try:
dst_object = gsutil_api.GetObjectMetadata(dst_url.bucket_name,
dst_url.object_name,
provider=dst_url.scheme)
except NotFoundException:
dst_object = None
if dst_object:
raise ItemExistsError()
if dst_url.IsCloudUrl():
# Cloud storage API gets object and bucket name from metadata.
dst_obj_metadata.name = dst_url.object_name
dst_obj_metadata.bucket = dst_url.bucket_name
if src_url.IsCloudUrl():
# Preserve relevant metadata from the source object if it's not already
# provided from the headers.
src_obj_metadata.name = src_url.object_name
src_obj_metadata.bucket = src_url.bucket_name
else:
_SetContentTypeFromFile(src_url, dst_obj_metadata)
# Only set KMS key name if destination provider is 'gs'.
encryption_keywrapper = GetEncryptionKeyWrapper(config)
if (encryption_keywrapper and
encryption_keywrapper.crypto_type == CryptoKeyType.CMEK and
dst_url.scheme == 'gs'):
dst_obj_metadata.kmsKeyName = encryption_keywrapper.crypto_key
if src_obj_metadata:
# Note that CopyObjectMetadata only copies specific fields. We intentionally
# do not copy storageClass, as the bucket's default storage class should be
# used (when copying to a gs:// bucket) unless explicitly overridden.
CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
if global_copy_helper_opts.dest_storage_class:
dst_obj_metadata.storageClass = global_copy_helper_opts.dest_storage_class
if config.get('GSUtil', 'check_hashes') == CHECK_HASH_NEVER:
# GCS server will perform MD5 validation if the md5 hash is present.
# Remove md5_hash if check_hashes=never.
dst_obj_metadata.md5Hash = None
_LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
if src_url.IsCloudUrl():
if dst_url.IsFileUrl():
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_DOWNLOAD,
size=src_obj_size,
finished=False))
return _DownloadObjectToFile(src_url,
src_obj_metadata,
dst_url,
gsutil_api,
logger,
command_obj,
copy_exception_handler,
allow_splitting=allow_splitting,
decryption_key=decryption_key,
is_rsync=is_rsync,
preserve_posix=preserve_posix,
use_stet=use_stet)
elif copy_in_the_cloud:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_CLOUD_COPY,
size=src_obj_size,
finished=False))
return _CopyObjToObjInTheCloud(src_url,
src_obj_metadata,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
decryption_key=decryption_key)
else:
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_DAISY_COPY,
size=src_obj_size,
finished=False))
return _CopyObjToObjDaisyChainMode(src_url,
src_obj_metadata,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
logger,
decryption_key=decryption_key)
else: # src_url.IsFileUrl()
if dst_url.IsCloudUrl():
# The FileMessage for this upload object is inside _UploadFileToObject().
# This is such because the function may alter src_url, which would prevent
# us from correctly tracking the new url.
uploaded_metadata = _UploadFileToObject(src_url,
src_obj_filestream,
src_obj_size,
dst_url,
dst_obj_metadata,
preconditions,
gsutil_api,
logger,
command_obj,
copy_exception_handler,
gzip_exts=gzip_exts,
allow_splitting=allow_splitting,
gzip_encoded=gzip_encoded)
if use_stet:
# Delete temporary file.
os.unlink(src_obj_filestream.name)
return uploaded_metadata
else: # dst_url.IsFileUrl()
PutToQueueWithTimeout(
gsutil_api.status_queue,
FileMessage(src_url,
dst_url,
time.time(),
message_type=FileMessage.FILE_LOCAL_COPY,
size=src_obj_size,
finished=False))
result = _CopyFileToFile(src_url,
dst_url,
status_queue=gsutil_api.status_queue,
src_obj_metadata=src_obj_metadata)
# Need to let _CopyFileToFile return before setting the POSIX attributes.
if not src_url.IsStream() and not dst_url.IsStream():
ParseAndSetPOSIXAttributes(dst_url.object_name,
src_obj_metadata,
is_rsync=is_rsync,
preserve_posix=preserve_posix)
return result