in azure/multiapi/storagev2/blob/v2022_11_02/_blob_client.py [0:0]
def _start_copy_from_url_options(self, source_url, metadata=None, incremental_copy=False, **kwargs):
# type: (str, Optional[Dict[str, str]], bool, **Any) -> Dict[str, Any]
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata))
if 'source_lease' in kwargs:
source_lease = kwargs.pop('source_lease')
try:
headers['x-ms-source-lease-id'] = source_lease.id
except AttributeError:
headers['x-ms-source-lease-id'] = source_lease
tier = kwargs.pop('premium_page_blob_tier', None) or kwargs.pop('standard_blob_tier', None)
tags = kwargs.pop('tags', None)
# Options only available for sync copy
requires_sync = kwargs.pop('requires_sync', None)
encryption_scope_str = kwargs.pop('encryption_scope', None)
source_authorization = kwargs.pop('source_authorization', None)
# If tags is a str, interpret that as copy_source_tags
copy_source_tags = isinstance(tags, str)
if incremental_copy:
if source_authorization:
raise ValueError("Source authorization tokens are not applicable for incremental copying.")
if copy_source_tags:
raise ValueError("Copying source tags is not applicable for incremental copying.")
# TODO: refactor start_copy_from_url api in _blob_client.py. Call _generated/_blob_operations.py copy_from_url
# when requires_sync=True is set.
# Currently both sync copy and async copy are calling _generated/_blob_operations.py start_copy_from_url.
# As sync copy diverges more from async copy, more problem will surface.
if requires_sync is True:
headers['x-ms-requires-sync'] = str(requires_sync)
if encryption_scope_str:
headers['x-ms-encryption-scope'] = encryption_scope_str
if source_authorization:
headers['x-ms-copy-source-authorization'] = source_authorization
if copy_source_tags:
headers['x-ms-copy-source-tag-option'] = tags
else:
if encryption_scope_str:
raise ValueError(
"Encryption_scope is only supported for sync copy, please specify requires_sync=True")
if source_authorization:
raise ValueError(
"Source authorization tokens are only supported for sync copy, please specify requires_sync=True")
if copy_source_tags:
raise ValueError(
"Copying source tags is only supported for sync copy, please specify requires_sync=True")
timeout = kwargs.pop('timeout', None)
dest_mod_conditions = get_modify_conditions(kwargs)
blob_tags_string = serialize_blob_tags_header(tags) if not copy_source_tags else None
immutability_policy = kwargs.pop('immutability_policy', None)
if immutability_policy:
kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
kwargs['immutability_policy_mode'] = immutability_policy.policy_mode
options = {
'copy_source': source_url,
'seal_blob': kwargs.pop('seal_destination_blob', None),
'timeout': timeout,
'modified_access_conditions': dest_mod_conditions,
'blob_tags_string': blob_tags_string,
'headers': headers,
'cls': return_response_headers,
}
if not incremental_copy:
source_mod_conditions = get_source_conditions(kwargs)
dest_access_conditions = get_access_conditions(kwargs.pop('destination_lease', None))
options['source_modified_access_conditions'] = source_mod_conditions
options['lease_access_conditions'] = dest_access_conditions
options['tier'] = tier.value if tier else None
options.update(kwargs)
return options