in azure/multiapi/storagev2/blob/v2021_08_06/_blob_client.py [0:0]
def _upload_blob_options( # pylint:disable=too-many-statements
self, data, # type: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]]
blob_type=BlobType.BlockBlob, # type: Union[str, BlobType]
length=None, # type: Optional[int]
metadata=None, # type: Optional[Dict[str, str]]
**kwargs
):
# type: (...) -> Dict[str, Any]
if self.require_encryption and not self.key_encryption_key:
raise ValueError("Encryption required but no key was provided.")
encryption_options = {
'required': self.require_encryption,
'version': self.encryption_version,
'key': self.key_encryption_key,
'resolver': self.key_resolver_function,
}
encoding = kwargs.pop('encoding', 'UTF-8')
if isinstance(data, six.text_type):
data = data.encode(encoding) # type: ignore
if length is None:
length = get_length(data)
if isinstance(data, bytes):
data = data[:length]
if isinstance(data, bytes):
stream = BytesIO(data)
elif hasattr(data, 'read'):
stream = data
elif hasattr(data, '__iter__') and not isinstance(data, (list, tuple, set, dict)):
stream = IterStreamer(data, encoding=encoding)
else:
raise TypeError("Unsupported data type: {}".format(type(data)))
validate_content = kwargs.pop('validate_content', False)
content_settings = kwargs.pop('content_settings', None)
overwrite = kwargs.pop('overwrite', False)
max_concurrency = kwargs.pop('max_concurrency', 1)
cpk = kwargs.pop('cpk', None)
cpk_info = None
if cpk:
if self.scheme.lower() != 'https':
raise ValueError("Customer provided encryption key must be used over HTTPS.")
cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm)
kwargs['cpk_info'] = cpk_info
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata))
kwargs['lease_access_conditions'] = get_access_conditions(kwargs.pop('lease', None))
kwargs['modified_access_conditions'] = get_modify_conditions(kwargs)
kwargs['cpk_scope_info'] = get_cpk_scope_info(kwargs)
if content_settings:
kwargs['blob_headers'] = BlobHTTPHeaders(
blob_cache_control=content_settings.cache_control,
blob_content_type=content_settings.content_type,
blob_content_md5=content_settings.content_md5,
blob_content_encoding=content_settings.content_encoding,
blob_content_language=content_settings.content_language,
blob_content_disposition=content_settings.content_disposition
)
kwargs['blob_tags_string'] = serialize_blob_tags_header(kwargs.pop('tags', None))
kwargs['stream'] = stream
kwargs['length'] = length
kwargs['overwrite'] = overwrite
kwargs['headers'] = headers
kwargs['validate_content'] = validate_content
kwargs['blob_settings'] = self._config
kwargs['max_concurrency'] = max_concurrency
kwargs['encryption_options'] = encryption_options
if blob_type == BlobType.BlockBlob:
kwargs['client'] = self._client.block_blob
kwargs['data'] = data
elif blob_type == BlobType.PageBlob:
if self.encryption_version == '2.0' and (self.require_encryption or self.key_encryption_key is not None):
raise ValueError("Encryption version 2.0 does not currently support page blobs.")
kwargs['client'] = self._client.page_blob
elif blob_type == BlobType.AppendBlob:
if self.require_encryption or (self.key_encryption_key is not None):
raise ValueError(_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION)
kwargs['client'] = self._client.append_blob
else:
raise ValueError("Unsupported BlobType: {}".format(blob_type))
return kwargs