alibabacloud_oss_v2/_client.py (481 lines of code) (raw):
import copy
import time
import base64
import re
from typing import Any, Optional, Dict, Iterable, List, Union, cast, Tuple, Iterator
from urllib.parse import urlparse, ParseResult, urlencode, quote
from xml.etree import ElementTree as ET
from . import retry
from . import transport
from . import exceptions
from . import utils
from . import defaults
from . import validation
from . import serde
from . import io_utils
from . import endpoints
from .signer import SignerV4, SignerV1
from .credentials import AnonymousCredentialsProvider
from .config import Config
from .types import (
Retryer,
CredentialsProvider,
HttpClient,
HttpRequest,
HttpResponse,
SigningContext,
Signer,
BodyType,
OperationInput,
OperationOutput,
)
class AddressStyle():
"""address style information
"""
Virtual = 1
Path = 2
CName = 3
class _MarkedBody:
def __init__(
self,
body: BodyType,
) -> None:
self._body = body
self._io_curr: int = 0
self._is_fileobj = False
if body is None:
self._seekable = True
elif isinstance(body, io_utils.TeeIterator):
self._seekable = body.seekable()
elif utils.is_fileobj(body):
self._seekable = utils.is_seekable(body)
self._is_fileobj = True
elif isinstance(body, Iterator):
self._seekable = False
elif isinstance(body, (str, bytes, Iterable)):
self._seekable = True
else:
self._seekable = False
def is_seekable(self) -> bool:
"""'is seekable
"""
return self._seekable
def mark(self) -> None:
"""Set the current marked position in the stream.
"""
if self.is_seekable() is False:
return
if self._is_fileobj:
self._io_curr = self._body.tell()
def reset(self) -> None:
"""Resets the buffer to the marked position.
"""
if self.is_seekable() is False:
return
if isinstance(self._body, io_utils.TeeIterator):
self._body.reset()
if self._is_fileobj:
self._body.seek(self._io_curr, 0)
class _Options:
"""client level's configuration."""
def __init__(
self,
product: str,
region: str,
endpoint: Optional[ParseResult] = None,
retry_max_attempts: Optional[int] = None,
retryer: Optional[Retryer] = None,
signer: Optional[Signer] = None,
credentials_provider: Optional[CredentialsProvider] = None,
http_client: Optional[Union[HttpClient]] = None,
address_style: Optional[AddressStyle] = None,
readwrite_timeout: Optional[Union[int, float]] = None,
response_handlers: Optional[List] = None,
response_stream: Optional[bool] = None,
auth_method: Optional[str] = None,
feature_flags: Optional[int] = None,
additional_headers: Optional[List[str]] = None,
operation_timeout: Optional[Union[int, float]] = None,
) -> None:
self.product = product
self.region = region
self.endpoint = endpoint
self.retry_max_attempts = retry_max_attempts
self.retryer = retryer
self.signer = signer
self.credentials_provider = credentials_provider
self.http_client = http_client
self.address_style = address_style
self.readwrite_timeout = readwrite_timeout
self.response_handlers = response_handlers or []
self.response_stream= response_stream
self.auth_method = auth_method
self.feature_flags = feature_flags or defaults.FF_DEFAULT
self.additional_headers = additional_headers
self.operation_timeout = operation_timeout
class _InnerOptions:
"""client runtime's information."""
def __init__(
self,
user_agent: str = None,
) -> None:
self.user_agent = user_agent
class _ClientImplMixIn:
"""Client implement"""
def resolve_config(self, config: Config) ->Tuple[_Options, _InnerOptions]:
"""convert config into client's options"""
options = _default_options(config)
_resolve_endpoint(config, options)
_resolve_retryer(config, options)
_resolve_signer(config, options)
_resolve_address_style(config, options)
_resolve_feature_flags(config, options)
_resolve_cloud_box(config, options)
self._resolve_httpclient(config, options) # pylint: disable=no-member
inner = _InnerOptions()
#UserAgent
inner.user_agent = _build_user_agent(config)
return options, inner
def resolve_kwargs(self, options: _Options, **kwargs):
"""client's configuration from user by key/value args"""
if len(kwargs) == 0:
return
options.product = kwargs.get("product", options.product)
options.region = kwargs.get("region", options.region)
options.endpoint = kwargs.get("endpoint", options.endpoint)
options.retry_max_attempts = kwargs.get("retry_max_attempts", options.retry_max_attempts)
options.retryer = kwargs.get("retryer", options.retryer)
options.signer = kwargs.get("signer", options.signer)
options.credentials_provider = kwargs.get("credentials_provider", options.credentials_provider)
options.http_client = kwargs.get("http_client", options.http_client)
options.address_style = kwargs.get("address_style", options.address_style)
options.readwrite_timeout = kwargs.get("readwrite_timeout", options.readwrite_timeout)
options.auth_method = kwargs.get("auth_method", None)
options.additional_headers = kwargs.get("additional_headers", options.additional_headers)
def resolve_operation_kwargs(self, options: _Options, **kwargs):
"""operation's configuration from user by key/value args"""
if len(kwargs) == 0:
return
options.retry_max_attempts = kwargs.get("retry_max_attempts", options.retry_max_attempts)
options.retryer = kwargs.get("retryer", options.retryer)
options.http_client = kwargs.get("http_client", options.http_client)
options.readwrite_timeout = kwargs.get("readwrite_timeout", options.readwrite_timeout)
options.auth_method = kwargs.get("auth_method", options.auth_method)
options.operation_timeout = kwargs.get("operation_timeout", None)
def verify_operation(self, op_input: OperationInput, options: _Options) -> None:
"""verify input and options"""
if not options.endpoint:
raise exceptions.ParamInvalidError(field="endpoint")
if (op_input.bucket is not None and
not validation.is_valid_bucket_name(op_input.bucket)):
raise exceptions.BucketNameInvalidError(
name=utils.safety_str(op_input.bucket))
if (op_input.key is not None and
not validation.is_valid_object_name(op_input.key)):
raise exceptions.ObjectNameInvalidError()
def apply_operation(self, options: _Options, op_input: OperationInput) -> None:
"""apply operation"""
self._apply_operation_options(options) # pylint: disable=no-member
_apply_operation_metadata(op_input, options)
def build_request_context(self, op_input: OperationInput, options: _Options, inner: _InnerOptions
) -> SigningContext:
"""build request context
"""
# host & path
url = _build_url(op_input, options)
# queries
if op_input.parameters is not None:
query = urlencode(op_input.parameters, quote_via=quote)
if len(query) > 0:
url = url + "?" + query
# build http request
request = HttpRequest(method=op_input.method, url=url)
# headers
request.headers.update(op_input.headers or {})
request.headers.update({'User-Agent': inner.user_agent})
# body
body = op_input.body or b''
# body tracker
if op_input.op_metadata is not None:
tracker = op_input.op_metadata.get("opm-request-body-tracker", None)
if tracker is not None:
writers = []
for t in tracker:
if hasattr(t, 'write'):
writers.append(t)
if len(writers) > 0:
body = io_utils.TeeIterator.from_source(body, writers)
request.body = body
# signing context
context = SigningContext(
product=options.product,
region=options.region,
bucket=op_input.bucket,
key=op_input.key,
request=request,
)
if utils.safety_str(options.auth_method) == 'query':
context.auth_method_query = True
oss_date = request.headers.get('x-oss-date', None)
if oss_date is not None:
context.signing_time = serde.deserialize_httptime(oss_date)
if (expiration_time := op_input.op_metadata.get('expiration_time', None)) is not None:
context.expiration_time = expiration_time
context.sub_resource = op_input.op_metadata.get("sub-resource", [])
return context
def retry_max_attempts(self, options: _Options) -> int:
"""retry max attempts"""
if options.retry_max_attempts is not None:
attempts = int(options.retry_max_attempts)
elif options.retryer is not None:
attempts = options.retryer.max_attempts()
else:
attempts = defaults.DEFAULT_MAX_ATTEMPTS
return max(1, attempts)
def has_feature(self, flag: int) -> bool:
"""has feature"""
return (self._options.feature_flags & flag) > 0 # pylint: disable=no-member
def get_retry_attempts(self) -> bool:
"""get retry attempts"""
return self.retry_max_attempts(self._options) # pylint: disable=no-member
class _SyncClientImpl(_ClientImplMixIn):
"""Sync API Client for common API."""
def __init__(self, config: Config, **kwargs) -> None:
options, inner = self.resolve_config(config)
self.resolve_kwargs(options, **kwargs)
self._config = config
self._options = options
self._inner = inner
def invoke_operation(self, op_input: OperationInput, **kwargs) -> OperationOutput:
"""Common class interface invoice operation
Args:
op_input (OperationInput): _description_
Raises:
exceptions.OperationError: _description_
Returns:
OperationOutput: _description_
"""
options = copy.copy(self._options)
self.resolve_operation_kwargs(options, **kwargs)
self.apply_operation(options, op_input)
try:
self.verify_operation(op_input, options)
output = self._sent_request(op_input, options)
except Exception as err:
raise exceptions.OperationError(
name=op_input.op_name,
error=err,
)
return output
def _resolve_httpclient(self, config: Config, options: _Options) -> None:
"""httpclient"""
if options.http_client:
return
kwargs: Dict[str, Any] = {}
if bool(config.insecure_skip_verify):
kwargs["insecure_skip_verify"] = True
if bool(config.enabled_redirect):
kwargs["enabled_redirect"] = True
if config.connect_timeout:
kwargs["connect_timeout"] = config.connect_timeout
if config.readwrite_timeout:
kwargs["readwrite_timeout"] = config.readwrite_timeout
if config.proxy_host:
kwargs["proxy_host"] = config.proxy_host
options.http_client = transport.RequestsHttpClient(**kwargs)
def _apply_operation_options(self, options: _Options) -> None:
# response handler
handlers = []
def service_error_response_handler(response: HttpResponse) -> None:
""" check service error """
if response.status_code // 100 == 2:
return
if not response.is_stream_consumed:
_ = response.read()
raise _to_service_error(response)
# insert service error responsed handler first
handlers.append(service_error_response_handler)
handlers.extend(options.response_handlers)
options.response_handlers = handlers
def _sent_request(self, op_input: OperationInput, options: _Options) -> OperationOutput:
context = self.build_request_context(op_input, options, self._inner)
response = self._sent_http_request(context, options)
output = OperationOutput(
status=response.reason,
status_code=response.status_code,
headers=response.headers,
op_input=op_input,
http_response=response
)
# save other info by Metadata filed
# output.op_metadata
if context.auth_method_query:
output.op_metadata['expiration_time'] = context.expiration_time
# update clock offset
return output
def _sent_http_request(self, context: SigningContext, options: _Options) -> HttpResponse:
request = context.request
retryer = options.retryer
max_attempts = self.retry_max_attempts(options)
# operation timeout
dealline = None
if isinstance(options.operation_timeout, (int, float)):
dealline = time.time() + options.operation_timeout
# Mark body
marked_body = _MarkedBody(request.body)
marked_body.mark()
reset_time = context.signing_time is None
error: Optional[Exception] = None
response: HttpResponse = None
for tries in range(max_attempts):
if tries > 0:
try:
marked_body.reset()
except: # pylint: disable=bare-except
# if meets reset error, just ignores, and retures last error
break
if reset_time:
context.signing_time = None
dealy = retryer.retry_delay(tries, error)
time.sleep(dealy)
# operation timeout
if dealline is not None and (time.time() > dealline):
break
try:
error = None
response = self._sent_http_request_once(context, options)
break
except Exception as e:
error = e
# operation timeout
if dealline is not None and (time.time() > dealline):
break
if marked_body.is_seekable() is False:
break
if not retryer.is_error_retryable(error):
break
if error is not None:
raise error
return response
def _sent_http_request_once(self, context: SigningContext, options: _Options) -> HttpResponse:
# sign request
if not isinstance(options.credentials_provider, AnonymousCredentialsProvider):
try:
cred = options.credentials_provider.get_credentials()
except Exception as e:
raise exceptions.CredentialsFetchError(error=e)
if cred is None or not cred.has_keys():
raise exceptions.CredentialsEmptyError()
# update credentials
context.credentials = cred
options.signer.sign(context)
# send
send_kwargs = {}
if options.response_stream is not None:
send_kwargs['stream'] = options.response_stream
if options.readwrite_timeout is not None:
send_kwargs['readwrite_timeout'] = options.readwrite_timeout
response = options.http_client.send(context.request, **send_kwargs)
# response handler
for h in options.response_handlers:
h(response)
return response
def _default_options(config: Config) -> _Options:
"""convert config to options"""
return _Options(
product=defaults.DEFAULT_PRODUCT,
region=config.region,
retry_max_attempts=config.retry_max_attempts,
retryer=cast(Retryer, config.retryer),
credentials_provider=cast(
CredentialsProvider, config.credentials_provider),
http_client=cast(HttpClient, config.http_client),
additional_headers=config.additional_headers
)
def _resolve_endpoint(config: Config, options: _Options) -> None:
"""endpoint"""
disable_ssl = utils.safety_bool(config.disable_ssl)
endpoint = utils.safety_str(config.endpoint)
region = utils.safety_str(config.region)
if len(endpoint) > 0:
endpoint = endpoints.add_scheme(endpoint, disable_ssl)
elif validation.is_valid_region(region):
if bool(config.use_dualstack_endpoint):
etype = "dualstack"
elif bool(config.use_internal_endpoint):
etype = "internal"
elif bool(config.use_accelerate_endpoint):
etype = "accelerate"
else:
etype = "default"
endpoint = endpoints.from_region(region, disable_ssl, etype)
if endpoint == "":
return
options.endpoint = urlparse(endpoint)
def _resolve_retryer(_: Config, options: _Options) -> None:
"""retryer"""
if options.retryer:
return
options.retryer = retry.StandardRetryer()
def _resolve_signer(config: Config, options: _Options) -> None:
"""signer"""
if options.signer:
return
if utils.safety_str(config.signature_version) == "v1":
options.signer = SignerV1()
else:
options.signer = SignerV4()
def _resolve_address_style(config: Config, options: _Options) -> None:
"""address_style"""
if bool(config.use_cname):
style = AddressStyle.CName
elif bool(config.use_path_style):
style = AddressStyle.Path
else:
style = AddressStyle.Virtual
# if the endpoint is ip, set to path-style
if options.endpoint:
hostname = options.endpoint.hostname
if endpoints.is_ip(hostname):
style = AddressStyle.Path
options.address_style = style
def _resolve_feature_flags(config: Config, options: _Options) -> None:
"""flags for feature"""
if utils.safety_bool(config.disable_upload_crc64_check):
options.feature_flags = options.feature_flags & ~defaults.FF_ENABLE_CRC64_CHECK_UPLOAD
if utils.safety_bool(config.disable_download_crc64_check):
options.feature_flags = options.feature_flags & ~defaults.FF_ENABLE_CRC64_CHECK_DOWNLOAD
def _resolve_cloud_box(config: Config, options: _Options) -> None:
"""cloud box"""
if config.cloud_box_id is not None:
options.region = str(config.cloud_box_id)
options.product = defaults.CLOUD_BOX_PRODUCT
return
if not config.enable_auto_detect_cloud_box_id:
return
host = options.endpoint.hostname
if not (host.endswith(".oss-cloudbox.aliyuncs.com") or
host.endswith(".oss-cloudbox-control.aliyuncs.com")):
return
keys = host.split(".")
if len(keys) != 5 or not keys[0].startswith("cb-"):
return
options.region = keys[0]
options.product = defaults.CLOUD_BOX_PRODUCT
def _apply_operation_metadata(op_input: OperationInput, options: _Options) -> None:
handlers = op_input.op_metadata.get('opm-response-handler', None)
if handlers is not None:
options.response_handlers.extend(handlers)
stream = op_input.op_metadata.get('response-stream', None)
if stream is not None:
options.response_stream = stream
def _build_url(op_input: OperationInput, options: _Options) -> str:
host = ""
paths = []
if op_input.bucket is None:
host = options.endpoint.netloc
else:
if options.address_style == AddressStyle.Path:
host = options.endpoint.netloc
paths.append(op_input.bucket)
if op_input.key is None:
paths.append('')
elif options.address_style == AddressStyle.CName:
host = options.endpoint.netloc
else:
host = f'{op_input.bucket}.{options.endpoint.netloc}'
if op_input.key is not None:
paths.append(quote(op_input.key))
return f'{options.endpoint.scheme}://{host}/{"/".join(paths)}'
def _to_service_error(response: HttpResponse) -> exceptions.ServiceError:
timestamp = serde.deserialize_httptime(response.headers.get('Date'))
content = response.content or b''
response.close()
error_fileds = {}
code = 'BadErrorResponse'
message = ''
ec = ''
request_id = ''
err_body = b''
try:
err_body = content
if len(err_body) == 0:
err_body = base64.b64decode(
response.headers.get('x-oss-err', ''))
root = ET.fromstring(err_body)
if root.tag == 'Error':
for child in root:
error_fileds[child.tag] = child.text
message = error_fileds.get('Message', '')
code = error_fileds.get('Code', '')
ec = error_fileds.get('EC', '')
request_id = error_fileds.get('RequestId', '')
else:
message = f'Expect root node Error, but get {root.tag}.'
except ET.ParseError as e:
errstr = err_body.decode()
if '<Error>' in errstr and '</Error>' in errstr:
m = re.search('<Code>(.*)</Code>', errstr)
if m:
code = m.group(1)
m = re.search('<Message>(.*)</Message>', errstr)
if m:
message = m.group(1)
if len(message) == 0:
message = f'Failed to parse xml from response body due to: {str(e)}. With part response body {err_body[:256]}.'
except Exception as e:
message = f'The body of the response was not readable, due to : {str(e)}.'
return exceptions.ServiceError(
status_code=response.status_code,
code=code,
message=message,
request_id=request_id or response.headers.get('x-oss-request-id', ''),
ec=ec or response.headers.get('x-oss-ec', ''),
timestamp=timestamp,
request_target=f'{response.request.method} {response.request.url}',
snapshot=content,
headers=response.headers,
error_fileds=error_fileds
)
def _build_user_agent(config: Config) -> str:
if config.user_agent:
return f'{utils.get_default_user_agent()}/{config.user_agent}'
return utils.get_default_user_agent()