def _upload_blob_options()

in azure/multiapi/storagev2/blob/v2021_08_06/_blob_client.py [0:0]


    def _upload_blob_options(  # pylint:disable=too-many-statements
            self, data,  # type: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]]
            blob_type=BlobType.BlockBlob,  # type: Union[str, BlobType]
            length=None,  # type: Optional[int]
            metadata=None,  # type: Optional[Dict[str, str]]
            **kwargs
        ):
        # type: (...) -> Dict[str, Any]
        if self.require_encryption and not self.key_encryption_key:
            raise ValueError("Encryption required but no key was provided.")
        encryption_options = {
            'required': self.require_encryption,
            'version': self.encryption_version,
            'key': self.key_encryption_key,
            'resolver': self.key_resolver_function,
        }

        encoding = kwargs.pop('encoding', 'UTF-8')
        if isinstance(data, six.text_type):
            data = data.encode(encoding) # type: ignore
        if length is None:
            length = get_length(data)
        if isinstance(data, bytes):
            data = data[:length]

        if isinstance(data, bytes):
            stream = BytesIO(data)
        elif hasattr(data, 'read'):
            stream = data
        elif hasattr(data, '__iter__') and not isinstance(data, (list, tuple, set, dict)):
            stream = IterStreamer(data, encoding=encoding)
        else:
            raise TypeError("Unsupported data type: {}".format(type(data)))

        validate_content = kwargs.pop('validate_content', False)
        content_settings = kwargs.pop('content_settings', None)
        overwrite = kwargs.pop('overwrite', False)
        max_concurrency = kwargs.pop('max_concurrency', 1)
        cpk = kwargs.pop('cpk', None)
        cpk_info = None
        if cpk:
            if self.scheme.lower() != 'https':
                raise ValueError("Customer provided encryption key must be used over HTTPS.")
            cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                               encryption_algorithm=cpk.algorithm)
        kwargs['cpk_info'] = cpk_info

        headers = kwargs.pop('headers', {})
        headers.update(add_metadata_headers(metadata))
        kwargs['lease_access_conditions'] = get_access_conditions(kwargs.pop('lease', None))
        kwargs['modified_access_conditions'] = get_modify_conditions(kwargs)
        kwargs['cpk_scope_info'] = get_cpk_scope_info(kwargs)
        if content_settings:
            kwargs['blob_headers'] = BlobHTTPHeaders(
                blob_cache_control=content_settings.cache_control,
                blob_content_type=content_settings.content_type,
                blob_content_md5=content_settings.content_md5,
                blob_content_encoding=content_settings.content_encoding,
                blob_content_language=content_settings.content_language,
                blob_content_disposition=content_settings.content_disposition
            )
        kwargs['blob_tags_string'] = serialize_blob_tags_header(kwargs.pop('tags', None))
        kwargs['stream'] = stream
        kwargs['length'] = length
        kwargs['overwrite'] = overwrite
        kwargs['headers'] = headers
        kwargs['validate_content'] = validate_content
        kwargs['blob_settings'] = self._config
        kwargs['max_concurrency'] = max_concurrency
        kwargs['encryption_options'] = encryption_options

        if blob_type == BlobType.BlockBlob:
            kwargs['client'] = self._client.block_blob
            kwargs['data'] = data
        elif blob_type == BlobType.PageBlob:
            if self.encryption_version == '2.0' and (self.require_encryption or self.key_encryption_key is not None):
                raise ValueError("Encryption version 2.0 does not currently support page blobs.")
            kwargs['client'] = self._client.page_blob
        elif blob_type == BlobType.AppendBlob:
            if self.require_encryption or (self.key_encryption_key is not None):
                raise ValueError(_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION)
            kwargs['client'] = self._client.append_blob
        else:
            raise ValueError("Unsupported BlobType: {}".format(blob_type))
        return kwargs