in gslib/commands/rsync.py [0:0]
def _RsyncFunc(cls, diff_to_apply, thread_state=None):
"""Worker function for performing the actual copy and remove operations."""
gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
dst_url_str = diff_to_apply.dst_url_str
dst_url = StorageUrlFromString(dst_url_str)
posix_attrs = diff_to_apply.src_posix_attrs
if diff_to_apply.diff_action == DiffAction.REMOVE:
if cls.dryrun:
cls.logger.info('Would remove %s', dst_url)
else:
cls.logger.info('Removing %s', dst_url)
if dst_url.IsFileUrl():
try:
os.unlink(dst_url.object_name)
except FileNotFoundError:
# Missing file errors occur occasionally with .gstmp files
# and can be ignored for deletes.
cls.logger.debug('%s was already removed', dst_url)
pass
else:
try:
gsutil_api.DeleteObject(dst_url.bucket_name,
dst_url.object_name,
generation=dst_url.generation,
provider=dst_url.scheme)
except NotFoundException:
# If the object happened to be deleted by an external process, this
# is fine because it moves us closer to the desired state.
pass
elif diff_to_apply.diff_action == DiffAction.COPY:
src_url_str = diff_to_apply.src_url_str
src_url = StorageUrlFromString(src_url_str)
if cls.dryrun:
if src_url.IsFileUrl():
# Try to open the local file to detect errors that would occur in
# non-dry-run mode.
try:
with open(src_url.object_name, 'rb') as _:
pass
except Exception as e: # pylint: disable=broad-except
cls.logger.info('Could not open %s' % src_url.object_name)
raise
cls.logger.info('Would copy %s to %s', src_url, dst_url)
else:
try:
src_obj_metadata = None
if src_url.IsCloudUrl():
src_generation = GenerationFromUrlAndString(src_url,
src_url.generation)
src_obj_metadata = gsutil_api.GetObjectMetadata(
src_url.bucket_name,
src_url.object_name,
generation=src_generation,
provider=src_url.scheme,
fields=cls.source_metadata_fields)
if ObjectIsGzipEncoded(src_obj_metadata):
cls.logger.info(
'%s has a compressed content-encoding, so it will be '
'decompressed upon download; future executions of gsutil rsync '
'with this source object will always download it. If you wish '
'to synchronize such an object efficiently, compress the '
'source objects in place before synchronizing them, rather '
'than (for example) using gsutil cp -Z to compress them '
'on-the-fly (which results in compressed content-encoding).' %
src_url)
else: # src_url.IsFileUrl()
src_obj_metadata = apitools_messages.Object()
# getmtime can return a float, so it needs to be converted to long.
if posix_attrs.mtime > long(time.time()) + SECONDS_PER_DAY:
WarnFutureTimestamp('mtime', src_url.url_string)
if src_url.IsFifo() or src_url.IsStream():
type_text = 'Streams' if src_url.IsStream() else 'Named pipes'
cls.logger.warn(
'WARNING: %s are not supported by gsutil rsync and '
'will likely fail. Use the -x option to exclude %s by name.',
type_text, src_url.url_string)
if src_obj_metadata.metadata:
custom_metadata = src_obj_metadata.metadata
else:
custom_metadata = apitools_messages.Object.MetadataValue(
additionalProperties=[])
SerializeFileAttributesToObjectMetadata(
posix_attrs,
custom_metadata,
preserve_posix=cls.preserve_posix_attrs)
tmp_obj_metadata = apitools_messages.Object()
tmp_obj_metadata.metadata = custom_metadata
CopyCustomMetadata(tmp_obj_metadata, src_obj_metadata, override=True)
copy_result = copy_helper.PerformCopy(
cls.logger,
src_url,
dst_url,
gsutil_api,
cls,
_RsyncExceptionHandler,
src_obj_metadata=src_obj_metadata,
headers=cls.headers,
is_rsync=True,
gzip_encoded=cls.gzip_encoded,
gzip_exts=cls.gzip_exts,
preserve_posix=cls.preserve_posix_attrs)
if copy_result is not None:
(_, bytes_transferred, _, _) = copy_result
with cls.stats_lock:
cls.total_bytes_transferred += bytes_transferred
except SkipUnsupportedObjectError as e:
cls.logger.info('Skipping item %s with unsupported object type %s',
src_url, e.unsupported_type)
elif diff_to_apply.diff_action == DiffAction.MTIME_SRC_TO_DST:
# If the destination is an object in a bucket, this will not blow away other
# metadata. This behavior is unlike if the file/object actually needed to be
# copied from the source to the destination.
dst_url = StorageUrlFromString(diff_to_apply.dst_url_str)
if cls.dryrun:
cls.logger.info('Would set mtime for %s', dst_url)
else:
cls.logger.info('Copying mtime from src to dst for %s',
dst_url.url_string)
mtime = posix_attrs.mtime
obj_metadata = apitools_messages.Object()
obj_metadata.metadata = CreateCustomMetadata({MTIME_ATTR: mtime})
if dst_url.IsCloudUrl():
dst_url = StorageUrlFromString(diff_to_apply.dst_url_str)
dst_generation = GenerationFromUrlAndString(dst_url, dst_url.generation)
try:
# Assume we have permission, and can patch the object.
gsutil_api.PatchObjectMetadata(dst_url.bucket_name,
dst_url.object_name,
obj_metadata,
provider=dst_url.scheme,
generation=dst_url.generation)
except ServiceException as err:
cls.logger.debug('Error while trying to patch: %s', err)
# We don't have permission to patch apparently, so it must be copied.
cls.logger.info(
'Copying whole file/object for %s instead of patching'
' because you don\'t have patch permission on the '
'object.', dst_url.url_string)
_RsyncFunc(cls,
RsyncDiffToApply(diff_to_apply.src_url_str,
diff_to_apply.dst_url_str, posix_attrs,
DiffAction.COPY, diff_to_apply.copy_size),
thread_state=thread_state)
else:
ParseAndSetPOSIXAttributes(dst_url.object_name,
obj_metadata,
preserve_posix=cls.preserve_posix_attrs)
elif diff_to_apply.diff_action == DiffAction.POSIX_SRC_TO_DST:
# If the destination is an object in a bucket, this will not blow away other
# metadata. This behavior is unlike if the file/object actually needed to be
# copied from the source to the destination.
dst_url = StorageUrlFromString(diff_to_apply.dst_url_str)
if cls.dryrun:
cls.logger.info('Would set POSIX attributes for %s', dst_url)
else:
cls.logger.info('Copying POSIX attributes from src to dst for %s',
dst_url.url_string)
obj_metadata = apitools_messages.Object()
obj_metadata.metadata = apitools_messages.Object.MetadataValue(
additionalProperties=[])
SerializeFileAttributesToObjectMetadata(posix_attrs,
obj_metadata.metadata,
preserve_posix=True)
if dst_url.IsCloudUrl():
dst_generation = GenerationFromUrlAndString(dst_url, dst_url.generation)
dst_obj_metadata = gsutil_api.GetObjectMetadata(
dst_url.bucket_name,
dst_url.object_name,
generation=dst_generation,
provider=dst_url.scheme,
fields=['acl'])
try:
# Assume we have ownership, and can patch the object.
gsutil_api.PatchObjectMetadata(dst_url.bucket_name,
dst_url.object_name,
obj_metadata,
provider=dst_url.scheme,
generation=dst_url.generation)
except ServiceException as err:
cls.logger.debug('Error while trying to patch: %s', err)
# Apparently we don't have object ownership, so it must be copied.
cls.logger.info(
'Copying whole file/object for %s instead of patching'
' because you don\'t have patch permission on the '
'object.', dst_url.url_string)
_RsyncFunc(cls,
RsyncDiffToApply(diff_to_apply.src_url_str,
diff_to_apply.dst_url_str, posix_attrs,
DiffAction.COPY, diff_to_apply.copy_size),
thread_state=thread_state)
else:
raise CommandException('Got unexpected DiffAction (%d)' %
diff_to_apply.diff_action)