def _start_copy_from_url_options()

in azure/multiapi/storagev2/blob/v2022_11_02/_blob_client.py [0:0]


    def _start_copy_from_url_options(self, source_url, metadata=None, incremental_copy=False, **kwargs):
        # type: (str, Optional[Dict[str, str]], bool, **Any) -> Dict[str, Any]
        headers = kwargs.pop('headers', {})
        headers.update(add_metadata_headers(metadata))
        if 'source_lease' in kwargs:
            source_lease = kwargs.pop('source_lease')
            try:
                headers['x-ms-source-lease-id'] = source_lease.id
            except AttributeError:
                headers['x-ms-source-lease-id'] = source_lease

        tier = kwargs.pop('premium_page_blob_tier', None) or kwargs.pop('standard_blob_tier', None)
        tags = kwargs.pop('tags', None)

        # Options only available for sync copy
        requires_sync = kwargs.pop('requires_sync', None)
        encryption_scope_str = kwargs.pop('encryption_scope', None)
        source_authorization = kwargs.pop('source_authorization', None)
        # If tags is a str, interpret that as copy_source_tags
        copy_source_tags = isinstance(tags, str)

        if incremental_copy:
            if source_authorization:
                raise ValueError("Source authorization tokens are not applicable for incremental copying.")
            if copy_source_tags:
                raise ValueError("Copying source tags is not applicable for incremental copying.")

        # TODO: refactor start_copy_from_url api in _blob_client.py. Call _generated/_blob_operations.py copy_from_url
        #  when requires_sync=True is set.
        #  Currently both sync copy and async copy are calling _generated/_blob_operations.py start_copy_from_url.
        #  As sync copy diverges more from async copy, more problem will surface.
        if requires_sync is True:
            headers['x-ms-requires-sync'] = str(requires_sync)
            if encryption_scope_str:
                headers['x-ms-encryption-scope'] = encryption_scope_str
            if source_authorization:
                headers['x-ms-copy-source-authorization'] = source_authorization
            if copy_source_tags:
                headers['x-ms-copy-source-tag-option'] = tags
        else:
            if encryption_scope_str:
                raise ValueError(
                    "Encryption_scope is only supported for sync copy, please specify requires_sync=True")
            if source_authorization:
                raise ValueError(
                    "Source authorization tokens are only supported for sync copy, please specify requires_sync=True")
            if copy_source_tags:
                raise ValueError(
                    "Copying source tags is only supported for sync copy, please specify requires_sync=True")

        timeout = kwargs.pop('timeout', None)
        dest_mod_conditions = get_modify_conditions(kwargs)
        blob_tags_string = serialize_blob_tags_header(tags) if not copy_source_tags else None

        immutability_policy = kwargs.pop('immutability_policy', None)
        if immutability_policy:
            kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
            kwargs['immutability_policy_mode'] = immutability_policy.policy_mode

        options = {
            'copy_source': source_url,
            'seal_blob': kwargs.pop('seal_destination_blob', None),
            'timeout': timeout,
            'modified_access_conditions': dest_mod_conditions,
            'blob_tags_string': blob_tags_string,
            'headers': headers,
            'cls': return_response_headers,
        }
        if not incremental_copy:
            source_mod_conditions = get_source_conditions(kwargs)
            dest_access_conditions = get_access_conditions(kwargs.pop('destination_lease', None))
            options['source_modified_access_conditions'] = source_mod_conditions
            options['lease_access_conditions'] = dest_access_conditions
            options['tier'] = tier.value if tier else None
        options.update(kwargs)
        return options