alibabacloud-gateway-oss/python/alibabacloud_gateway_oss/client.py (931 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. from alibabacloud_darabonba_encode_util.encoder import Encoder from alibabacloud_darabonba_signature_util.signer import Signer from Tea.exceptions import TeaException from typing import List, Dict from Tea.core import TeaCore from alibabacloud_gateway_spi.client import Client as SPIClient from alibabacloud_gateway_spi import models as spi_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_darabonba_map.client import Client as MapClient from alibabacloud_darabonba_string.client import Client as StringClient from alibabacloud_tea_xml.client import Client as XMLClient from alibabacloud_openapi_util.client import Client as OpenApiUtilClient from alibabacloud_oss_util.client import Client as OSSUtilClient from alibabacloud_darabonba_array.client import Client as ArrayClient from alibabacloud_gateway_oss_util.client import Client as OSS_UtilClient class Client(SPIClient): _default_signed_params: List[str] = None _except_signed_params: List[str] = None def __init__(self): super().__init__() self._default_signed_params = [ 'response-content-type', 'response-content-language', 'response-cache-control', 'logging', 'response-content-encoding', 'acl', 'uploadId', 'uploads', 'partNumber', 'group', 'link', 'delete', 'website', 'location', 'objectInfo', 'objectMeta', 'response-expires', 'response-content-disposition', 'cors', 'lifecycle', 'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token', 'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process', 'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions', 'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo', 'asyncFetch', 'x-oss-request-payer', 'sequential', 'inventory', 'inventoryId', 'continuation-token', 'callback', 'callback-var', 'worm', 'wormId', 'wormExtend', 'replication', 'replicationLocation', 'replicationProgress', 'transferAcceleration', 'cname', 'metaQuery', 'x-oss-ac-source-ip', 'x-oss-ac-subnet-mask', 'x-oss-ac-vpc-id', 'x-oss-ac-forward-allow', 'resourceGroup', 'style', 'styleName', 'x-oss-async-process', 'rtc', 'accessPoint', 'accessPointPolicy', 'httpsConfig', 'regionsV2', 'publicAccessBlock', 'policyStatus', 'redundancyTransition', 'redundancyType', 'redundancyProgress', 'dataAccelerator', 'verbose', 'accessPointForObjectProcess', 'accessPointConfigForObjectProcess', 'accessPointPolicyForObjectProcess', 'bucketArchiveDirectRead', 'responseHeader', 'userDefinedLogFieldsConfig', 'reservedcapacity', 'requesterQosInfo', 'qosRequester', 'resourcePool', 'resourcePoolInfo', 'resourcePoolBuckets', 'processConfiguration', 'img', 'asyncFetch', 'virtualBucket', 'copy', 'userRegion', 'partSize', 'chunkSize', 'partUploadId', 'chunkNumber', 'userRegion', 'regionList', 'eventnotification', 'cacheConfiguration', 'dfs', 'dfsadmin', 'dfssecurity' ] self._except_signed_params = [ 'list-type', 'regions' ] def modify_configuration( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: config = context.configuration config.endpoint = self.get_endpoint(config.region_id, config.network, config.endpoint) async def modify_configuration_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: config = context.configuration config.endpoint = await self.get_endpoint_async(config.region_id, config.network, config.endpoint) def modify_request( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request host_map = {} if not UtilClient.is_unset(request.host_map): host_map = request.host_map bucket_name = host_map.get('bucket') if UtilClient.is_unset(bucket_name): bucket_name = '' if not UtilClient.is_unset(request.headers.get('x-oss-meta-*')): tmp = UtilClient.parse_json(request.headers.get('x-oss-meta-*')) map_data = UtilClient.assert_as_map(tmp) meta_data = UtilClient.stringify_map_value(map_data) meta_key_set = MapClient.key_set(meta_data) request.headers['x-oss-meta-*'] = None for key in meta_key_set: new_key = f'x-oss-meta-{key}' request.headers[new_key] = meta_data.get(key) config = context.configuration region_id = config.region_id if UtilClient.is_unset(region_id) or UtilClient.empty(region_id): region_id = self.get_region_id_from_endpoint(config.endpoint) credential = request.credential access_key_id = credential.get_access_key_id() access_key_secret = credential.get_access_key_secret() security_token = credential.get_security_token() if not UtilClient.empty(security_token): request.headers['x-oss-security-token'] = security_token if not UtilClient.is_unset(request.body): if StringClient.equals(request.req_body_type, 'xml'): req_body_map = UtilClient.assert_as_map(request.body) xml_str = OSS_UtilClient.to_xml(req_body_map) request.stream = xml_str request.headers['content-type'] = 'application/xml' request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(xml_str)) elif StringClient.equals(request.req_body_type, 'json'): req_body_str = UtilClient.to_jsonstring(request.body) request.stream = req_body_str request.headers['content-type'] = 'application/json; charset=utf-8' elif StringClient.equals(request.req_body_type, 'formData'): req_body_form = UtilClient.assert_as_map(request.body) request.stream = OpenApiUtilClient.to_form(req_body_form) request.headers['content-type'] = 'application/x-www-form-urlencoded' elif StringClient.equals(request.req_body_type, 'binary'): attribute_map.key = { 'crc': '', 'md5': '' } request.stream = OSSUtilClient.inject(request.stream, attribute_map.key) request.headers['content-type'] = 'application/octet-stream' host = self.get_host(config.endpoint_type, bucket_name, config.endpoint, context) request.headers = TeaCore.merge({ 'host': host, 'date': UtilClient.get_date_utcstring(), 'user-agent': request.user_agent }, request.headers) origin_path = request.pathname origin_query = request.query if not UtilClient.empty(origin_path): path_and_queries = StringClient.split(origin_path, f'?', 2) request.pathname = path_and_queries[0] if UtilClient.equal_number(ArrayClient.size(path_and_queries), 2): path_queries = StringClient.split(path_and_queries[1], '&', None) for sub in path_queries: item = StringClient.split(sub, '=', None) query_key = item[0] query_value = '' if UtilClient.equal_number(ArrayClient.size(item), 2): query_value = item[1] if UtilClient.empty(origin_query.get(query_key)): request.query[query_key] = query_value signature_version = UtilClient.default_string(request.signature_version, 'v4') request.headers['authorization'] = self.get_authorization(signature_version, bucket_name, request.pathname, request.method, request.query, request.headers, access_key_id, access_key_secret, region_id) async def modify_request_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request host_map = {} if not UtilClient.is_unset(request.host_map): host_map = request.host_map bucket_name = host_map.get('bucket') if UtilClient.is_unset(bucket_name): bucket_name = '' if not UtilClient.is_unset(request.headers.get('x-oss-meta-*')): tmp = UtilClient.parse_json(request.headers.get('x-oss-meta-*')) map_data = UtilClient.assert_as_map(tmp) meta_data = UtilClient.stringify_map_value(map_data) meta_key_set = MapClient.key_set(meta_data) request.headers['x-oss-meta-*'] = None for key in meta_key_set: new_key = f'x-oss-meta-{key}' request.headers[new_key] = meta_data.get(key) config = context.configuration region_id = config.region_id if UtilClient.is_unset(region_id) or UtilClient.empty(region_id): region_id = await self.get_region_id_from_endpoint_async(config.endpoint) credential = request.credential access_key_id = await credential.get_access_key_id_async() access_key_secret = await credential.get_access_key_secret_async() security_token = await credential.get_security_token_async() if not UtilClient.empty(security_token): request.headers['x-oss-security-token'] = security_token if not UtilClient.is_unset(request.body): if StringClient.equals(request.req_body_type, 'xml'): req_body_map = UtilClient.assert_as_map(request.body) xml_str = OSS_UtilClient.to_xml(req_body_map) request.stream = xml_str request.headers['content-type'] = 'application/xml' request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(xml_str)) elif StringClient.equals(request.req_body_type, 'json'): req_body_str = UtilClient.to_jsonstring(request.body) request.stream = req_body_str request.headers['content-type'] = 'application/json; charset=utf-8' elif StringClient.equals(request.req_body_type, 'formData'): req_body_form = UtilClient.assert_as_map(request.body) request.stream = OpenApiUtilClient.to_form(req_body_form) request.headers['content-type'] = 'application/x-www-form-urlencoded' elif StringClient.equals(request.req_body_type, 'binary'): attribute_map.key = { 'crc': '', 'md5': '' } request.stream = OSSUtilClient.inject(request.stream, attribute_map.key) request.headers['content-type'] = 'application/octet-stream' host = await self.get_host_async(config.endpoint_type, bucket_name, config.endpoint, context) request.headers = TeaCore.merge({ 'host': host, 'date': UtilClient.get_date_utcstring(), 'user-agent': request.user_agent }, request.headers) origin_path = request.pathname origin_query = request.query if not UtilClient.empty(origin_path): path_and_queries = StringClient.split(origin_path, f'?', 2) request.pathname = path_and_queries[0] if UtilClient.equal_number(ArrayClient.size(path_and_queries), 2): path_queries = StringClient.split(path_and_queries[1], '&', None) for sub in path_queries: item = StringClient.split(sub, '=', None) query_key = item[0] query_value = '' if UtilClient.equal_number(ArrayClient.size(item), 2): query_value = item[1] if UtilClient.empty(origin_query.get(query_key)): request.query[query_key] = query_value signature_version = UtilClient.default_string(request.signature_version, 'v4') request.headers['authorization'] = await self.get_authorization_async(signature_version, bucket_name, request.pathname, request.method, request.query, request.headers, access_key_id, access_key_secret, region_id) def modify_response( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request response = context.response body_str = None if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code): body_str = UtilClient.read_as_string(response.body) if not UtilClient.empty(body_str): resp_map = XMLClient.parse_xml(body_str, None) err = UtilClient.assert_as_map(resp_map.get('Error')) raise TeaException({ 'code': err.get('Code'), 'message': err.get('Message'), 'data': { 'statusCode': response.status_code, 'requestId': err.get('RequestId'), 'ecCode': err.get('EC'), 'Recommend': err.get('RecommendDoc'), 'hostId': err.get('HostId'), 'AccessDeniedDetail': err.get('AccessDeniedDetail') } }) else: headers = response.headers request_id = headers.get('x-oss-request-id') ec_code = headers.get('x-oss-ec-code') raise TeaException({ 'code': response.status_code, 'message': None, 'data': { 'statusCode': response.status_code, 'requestId': f'{request_id}', 'ecCode': ec_code } }) ctx = attribute_map.key if not UtilClient.is_unset(ctx): if not UtilClient.is_unset(ctx.get('crc')) and not UtilClient.is_unset(response.headers.get('x-oss-hash-crc64ecma')) and not StringClient.equals(ctx.get('crc'), response.headers.get('x-oss-hash-crc64ecma')): raise TeaException({ 'code': 'CrcNotMatched', 'data': { 'clientCrc': ctx.get('crc'), 'serverCrc': response.headers.get('x-oss-hash-crc64ecma') } }) if not UtilClient.is_unset(ctx.get('md5')) and not UtilClient.is_unset(response.headers.get('content-md5')) and not StringClient.equals(ctx.get('md5'), response.headers.get('content-md5')): raise TeaException({ 'code': 'MD5NotMatched', 'data': { 'clientMD5': ctx.get('md5'), 'serverMD5': response.headers.get('content-md5') } }) if not UtilClient.is_unset(response.body): if UtilClient.equal_number(response.status_code, 204): UtilClient.read_as_string(response.body) elif StringClient.equals(request.body_type, 'xml'): body_str = UtilClient.read_as_string(response.body) response.deserialized_body = body_str if not UtilClient.empty(body_str): result = OSS_UtilClient.parse_xml(body_str, request.action) # for no util language # var result : any = XML.parseXml(bodyStr, null); try: response.deserialized_body = UtilClient.assert_as_map(result) except Exception as error: response.deserialized_body = result elif UtilClient.equal_string(request.body_type, 'binary'): response.deserialized_body = response.body elif UtilClient.equal_string(request.body_type, 'byte'): byt = UtilClient.read_as_bytes(response.body) response.deserialized_body = byt elif UtilClient.equal_string(request.body_type, 'string'): response.deserialized_body = UtilClient.read_as_string(response.body) elif UtilClient.equal_string(request.body_type, 'json'): obj = UtilClient.read_as_json(response.body) res = UtilClient.assert_as_map(obj) response.deserialized_body = res elif UtilClient.equal_string(request.body_type, 'array'): response.deserialized_body = UtilClient.read_as_json(response.body) else: response.deserialized_body = UtilClient.read_as_string(response.body) async def modify_response_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request response = context.response body_str = None if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code): body_str = await UtilClient.read_as_string_async(response.body) if not UtilClient.empty(body_str): resp_map = XMLClient.parse_xml(body_str, None) err = UtilClient.assert_as_map(resp_map.get('Error')) raise TeaException({ 'code': err.get('Code'), 'message': err.get('Message'), 'data': { 'statusCode': response.status_code, 'requestId': err.get('RequestId'), 'ecCode': err.get('EC'), 'Recommend': err.get('RecommendDoc'), 'hostId': err.get('HostId'), 'AccessDeniedDetail': err.get('AccessDeniedDetail') } }) else: headers = response.headers request_id = headers.get('x-oss-request-id') ec_code = headers.get('x-oss-ec-code') raise TeaException({ 'code': response.status_code, 'message': None, 'data': { 'statusCode': response.status_code, 'requestId': f'{request_id}', 'ecCode': ec_code } }) ctx = attribute_map.key if not UtilClient.is_unset(ctx): if not UtilClient.is_unset(ctx.get('crc')) and not UtilClient.is_unset(response.headers.get('x-oss-hash-crc64ecma')) and not StringClient.equals(ctx.get('crc'), response.headers.get('x-oss-hash-crc64ecma')): raise TeaException({ 'code': 'CrcNotMatched', 'data': { 'clientCrc': ctx.get('crc'), 'serverCrc': response.headers.get('x-oss-hash-crc64ecma') } }) if not UtilClient.is_unset(ctx.get('md5')) and not UtilClient.is_unset(response.headers.get('content-md5')) and not StringClient.equals(ctx.get('md5'), response.headers.get('content-md5')): raise TeaException({ 'code': 'MD5NotMatched', 'data': { 'clientMD5': ctx.get('md5'), 'serverMD5': response.headers.get('content-md5') } }) if not UtilClient.is_unset(response.body): if UtilClient.equal_number(response.status_code, 204): await UtilClient.read_as_string_async(response.body) elif StringClient.equals(request.body_type, 'xml'): body_str = await UtilClient.read_as_string_async(response.body) response.deserialized_body = body_str if not UtilClient.empty(body_str): result = await OSS_UtilClient.parse_xml_async(body_str, request.action) # for no util language # var result : any = XML.parseXml(bodyStr, null); try: response.deserialized_body = UtilClient.assert_as_map(result) except Exception as error: response.deserialized_body = result elif UtilClient.equal_string(request.body_type, 'binary'): response.deserialized_body = response.body elif UtilClient.equal_string(request.body_type, 'byte'): byt = await UtilClient.read_as_bytes_async(response.body) response.deserialized_body = byt elif UtilClient.equal_string(request.body_type, 'string'): response.deserialized_body = await UtilClient.read_as_string_async(response.body) elif UtilClient.equal_string(request.body_type, 'json'): obj = await UtilClient.read_as_json_async(response.body) res = UtilClient.assert_as_map(obj) response.deserialized_body = res elif UtilClient.equal_string(request.body_type, 'array'): response.deserialized_body = await UtilClient.read_as_json_async(response.body) else: response.deserialized_body = await UtilClient.read_as_string_async(response.body) def get_region_id_from_endpoint( self, endpoint: str, ) -> str: if not UtilClient.empty(endpoint): idx = -1 if StringClient.has_prefix(endpoint, 'oss-') and StringClient.has_suffix(endpoint, '.aliyuncs.com'): idx = StringClient.index(endpoint, '.aliyuncs.com') return StringClient.sub_string(endpoint, 4, idx) if StringClient.has_suffix(endpoint, '.mgw.aliyuncs.com'): idx = StringClient.index(endpoint, '.mgw.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '.mgw-internal.aliyuncs.com'): idx = StringClient.index(endpoint, '.mgw-internal.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '-internal.oss-data-acc.aliyuncs.com'): idx = StringClient.index(endpoint, '-internal.oss-data-acc.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '.oss-dls.aliyuncs.com'): idx = StringClient.index(endpoint, '.oss-dls.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) return '' async def get_region_id_from_endpoint_async( self, endpoint: str, ) -> str: if not UtilClient.empty(endpoint): idx = -1 if StringClient.has_prefix(endpoint, 'oss-') and StringClient.has_suffix(endpoint, '.aliyuncs.com'): idx = StringClient.index(endpoint, '.aliyuncs.com') return StringClient.sub_string(endpoint, 4, idx) if StringClient.has_suffix(endpoint, '.mgw.aliyuncs.com'): idx = StringClient.index(endpoint, '.mgw.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '.mgw-internal.aliyuncs.com'): idx = StringClient.index(endpoint, '.mgw-internal.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '-internal.oss-data-acc.aliyuncs.com'): idx = StringClient.index(endpoint, '-internal.oss-data-acc.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) if StringClient.has_suffix(endpoint, '.oss-dls.aliyuncs.com'): idx = StringClient.index(endpoint, '.oss-dls.aliyuncs.com') return StringClient.sub_string(endpoint, 0, idx) return '' def get_endpoint( self, region_id: str, network: str, endpoint: str, ) -> str: if not UtilClient.empty(endpoint): return endpoint if UtilClient.empty(region_id): region_id = 'cn-hangzhou' if not UtilClient.empty(network): if StringClient.contains(network, 'internal'): return f'oss-{region_id}-internal.aliyuncs.com' elif StringClient.contains(network, 'ipv6'): return f'{region_id}oss.aliyuncs.com' elif StringClient.contains(network, 'accelerate'): return f'oss-{network}.aliyuncs.com' return f'oss-{region_id}.aliyuncs.com' async def get_endpoint_async( self, region_id: str, network: str, endpoint: str, ) -> str: if not UtilClient.empty(endpoint): return endpoint if UtilClient.empty(region_id): region_id = 'cn-hangzhou' if not UtilClient.empty(network): if StringClient.contains(network, 'internal'): return f'oss-{region_id}-internal.aliyuncs.com' elif StringClient.contains(network, 'ipv6'): return f'{region_id}oss.aliyuncs.com' elif StringClient.contains(network, 'accelerate'): return f'oss-{network}.aliyuncs.com' return f'oss-{region_id}.aliyuncs.com' def get_host( self, endpoint_type: str, bucket_name: str, endpoint: str, context: spi_models.InterceptorContext, ) -> str: if StringClient.contains(endpoint, '.mgw.aliyuncs.com') and not UtilClient.is_unset(context.request.host_map.get('userid')): return f"{context.request.host_map.get('userid')}.{endpoint}" if StringClient.contains(endpoint, '.mgw-internal.aliyuncs.com') and not UtilClient.is_unset(context.request.host_map.get('userid')): return f"{context.request.host_map.get('userid')}.{endpoint}" if UtilClient.empty(bucket_name): return endpoint host = f'{bucket_name}.{endpoint}' if not UtilClient.empty(endpoint_type): if StringClient.equals(endpoint_type, 'ip'): host = f'{endpoint}/{bucket_name}' elif StringClient.equals(endpoint_type, 'cname'): host = endpoint return host async def get_host_async( self, endpoint_type: str, bucket_name: str, endpoint: str, context: spi_models.InterceptorContext, ) -> str: if StringClient.contains(endpoint, '.mgw.aliyuncs.com') and not UtilClient.is_unset(context.request.host_map.get('userid')): return f"{context.request.host_map.get('userid')}.{endpoint}" if StringClient.contains(endpoint, '.mgw-internal.aliyuncs.com') and not UtilClient.is_unset(context.request.host_map.get('userid')): return f"{context.request.host_map.get('userid')}.{endpoint}" if UtilClient.empty(bucket_name): return endpoint host = f'{bucket_name}.{endpoint}' if not UtilClient.empty(endpoint_type): if StringClient.equals(endpoint_type, 'ip'): host = f'{endpoint}/{bucket_name}' elif StringClient.equals(endpoint_type, 'cname'): host = endpoint return host def get_authorization( self, signature_version: str, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], ak: str, secret: str, region_id: str, ) -> str: sign = '' if not UtilClient.is_unset(signature_version): if StringClient.equals(signature_version, 'v1'): sign = self.get_signature_v1(bucket_name, pathname, method, query, headers, secret) return f'OSS {ak}:{sign}' if StringClient.equals(signature_version, 'v2'): sign = self.get_signature_v2(bucket_name, pathname, method, query, headers, secret) return f'OSS2 AccessKeyId:{ak},Signature:{sign}' date_time = OpenApiUtilClient.get_timestamp() date_time = StringClient.replace(date_time, '-', '', None) date_time = StringClient.replace(date_time, ':', '', None) headers['x-oss-date'] = date_time headers['x-oss-content-sha256'] = 'UNSIGNED-PAYLOAD' only_date = StringClient.sub_string(date_time, 0, 8) cred = f'{ak}/{only_date}/{region_id}/oss/aliyun_v4_request' sign = self.get_signature_v4(bucket_name, pathname, method, query, headers, only_date, region_id, secret) return f'OSS4-HMAC-SHA256 Credential={cred}, Signature={sign}' async def get_authorization_async( self, signature_version: str, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], ak: str, secret: str, region_id: str, ) -> str: sign = '' if not UtilClient.is_unset(signature_version): if StringClient.equals(signature_version, 'v1'): sign = await self.get_signature_v1_async(bucket_name, pathname, method, query, headers, secret) return f'OSS {ak}:{sign}' if StringClient.equals(signature_version, 'v2'): sign = await self.get_signature_v2_async(bucket_name, pathname, method, query, headers, secret) return f'OSS2 AccessKeyId:{ak},Signature:{sign}' date_time = OpenApiUtilClient.get_timestamp() date_time = StringClient.replace(date_time, '-', '', None) date_time = StringClient.replace(date_time, ':', '', None) headers['x-oss-date'] = date_time headers['x-oss-content-sha256'] = 'UNSIGNED-PAYLOAD' only_date = StringClient.sub_string(date_time, 0, 8) cred = f'{ak}/{only_date}/{region_id}/oss/aliyun_v4_request' sign = await self.get_signature_v4_async(bucket_name, pathname, method, query, headers, only_date, region_id, secret) return f'OSS4-HMAC-SHA256 Credential={cred}, Signature={sign}' def get_sign_key( self, secret: str, only_date: str, region_id: str, ) -> bytes: temp = f'aliyun_v4{secret}' res = Signer.hmac_sha256sign(only_date, temp) res = Signer.hmac_sha256sign_by_bytes(region_id, res) res = Signer.hmac_sha256sign_by_bytes('oss', res) res = Signer.hmac_sha256sign_by_bytes('aliyun_v4_request', res) return res async def get_sign_key_async( self, secret: str, only_date: str, region_id: str, ) -> bytes: temp = f'aliyun_v4{secret}' res = Signer.hmac_sha256sign(only_date, temp) res = Signer.hmac_sha256sign_by_bytes(region_id, res) res = Signer.hmac_sha256sign_by_bytes('oss', res) res = Signer.hmac_sha256sign_by_bytes('aliyun_v4_request', res) return res def get_signature_v4( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], only_date: str, region_id: str, secret: str, ) -> str: signingkey = self.get_sign_key(secret, only_date, region_id) canonicalized_uri = pathname if not UtilClient.empty(pathname): if not UtilClient.empty(bucket_name): canonicalized_uri = f'/{bucket_name}{canonicalized_uri}' else: if not UtilClient.empty(bucket_name): canonicalized_uri = f'/{bucket_name}/' else: canonicalized_uri = '/' # for java: # String suffix = (!canonicalizedUri.equals("/") && canonicalizedUri.endsWith("/"))? "/" : ""; # canonicalizedUri = com.aliyun.openapiutil.Client.getEncodePath(canonicalizedUri) + suffix; canonicalized_uri = OpenApiUtilClient.get_encode_path(canonicalized_uri) query_map = {} for query_key in MapClient.key_set(query): query_value = None if not UtilClient.empty(query.get(query_key)): query_value = Encoder.percent_encode(query.get(query_key)) query_value = StringClient.replace(query_value, '+', '%20', None) query_key = Encoder.percent_encode(query_key) query_key = StringClient.replace(query_key, '+', '%20', None) # for go : queryMap[tea.StringValue(queryKey)] = queryValue query_map[query_key] = query_value canonicalized_query_string = self.build_canonicalized_query_string_v4(query_map) canonicalized_headers = self.build_canonicalized_headers_v4(headers) payload = 'UNSIGNED-PAYLOAD' canonical_request = f'{method}\n{canonicalized_uri}\n{canonicalized_query_string}\n{canonicalized_headers}\n\n{payload}' hex = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(canonical_request), 'ACS4-HMAC-SHA256')) scope = f'{only_date}/{region_id}/oss/aliyun_v4_request' string_to_sign = f"OSS4-HMAC-SHA256\n{headers.get('x-oss-date')}\n{scope}\n{hex}" signature = Signer.hmac_sha256sign_by_bytes(string_to_sign, signingkey) return Encoder.hex_encode(signature) async def get_signature_v4_async( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], only_date: str, region_id: str, secret: str, ) -> str: signingkey = await self.get_sign_key_async(secret, only_date, region_id) canonicalized_uri = pathname if not UtilClient.empty(pathname): if not UtilClient.empty(bucket_name): canonicalized_uri = f'/{bucket_name}{canonicalized_uri}' else: if not UtilClient.empty(bucket_name): canonicalized_uri = f'/{bucket_name}/' else: canonicalized_uri = '/' # for java: # String suffix = (!canonicalizedUri.equals("/") && canonicalizedUri.endsWith("/"))? "/" : ""; # canonicalizedUri = com.aliyun.openapiutil.Client.getEncodePath(canonicalizedUri) + suffix; canonicalized_uri = OpenApiUtilClient.get_encode_path(canonicalized_uri) query_map = {} for query_key in MapClient.key_set(query): query_value = None if not UtilClient.empty(query.get(query_key)): query_value = Encoder.percent_encode(query.get(query_key)) query_value = StringClient.replace(query_value, '+', '%20', None) query_key = Encoder.percent_encode(query_key) query_key = StringClient.replace(query_key, '+', '%20', None) # for go : queryMap[tea.StringValue(queryKey)] = queryValue query_map[query_key] = query_value canonicalized_query_string = await self.build_canonicalized_query_string_v4_async(query_map) canonicalized_headers = await self.build_canonicalized_headers_v4_async(headers) payload = 'UNSIGNED-PAYLOAD' canonical_request = f'{method}\n{canonicalized_uri}\n{canonicalized_query_string}\n{canonicalized_headers}\n\n{payload}' hex = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(canonical_request), 'ACS4-HMAC-SHA256')) scope = f'{only_date}/{region_id}/oss/aliyun_v4_request' string_to_sign = f"OSS4-HMAC-SHA256\n{headers.get('x-oss-date')}\n{scope}\n{hex}" signature = Signer.hmac_sha256sign_by_bytes(string_to_sign, signingkey) return Encoder.hex_encode(signature) def build_canonicalized_query_string_v4( self, query_map: Dict[str, str], ) -> str: canonicalized_query_string = '' if not UtilClient.is_unset(query_map): query_array = MapClient.key_set(query_map) sorted_query_array = ArrayClient.asc_sort(query_array) separator = '' for key in sorted_query_array: canonicalized_query_string = f'{canonicalized_query_string}{separator}{key}' if not UtilClient.empty(query_map.get(key)): canonicalized_query_string = f'{canonicalized_query_string}={query_map.get(key)}' separator = '&' return canonicalized_query_string async def build_canonicalized_query_string_v4_async( self, query_map: Dict[str, str], ) -> str: canonicalized_query_string = '' if not UtilClient.is_unset(query_map): query_array = MapClient.key_set(query_map) sorted_query_array = ArrayClient.asc_sort(query_array) separator = '' for key in sorted_query_array: canonicalized_query_string = f'{canonicalized_query_string}{separator}{key}' if not UtilClient.empty(query_map.get(key)): canonicalized_query_string = f'{canonicalized_query_string}={query_map.get(key)}' separator = '&' return canonicalized_query_string def build_canonicalized_headers_v4( self, headers: Dict[str, str], ) -> str: canonicalized_headers = '' headers_array = MapClient.key_set(headers) sorted_headers_array = ArrayClient.asc_sort(headers_array) for key in sorted_headers_array: lower_key = StringClient.to_lower(key) if StringClient.has_prefix(lower_key, 'x-oss-') or StringClient.equals(lower_key, 'content-type') or StringClient.equals(lower_key, 'content-md5'): canonicalized_headers = f'{canonicalized_headers}{key}:{StringClient.trim(headers.get(key))}\n' return canonicalized_headers async def build_canonicalized_headers_v4_async( self, headers: Dict[str, str], ) -> str: canonicalized_headers = '' headers_array = MapClient.key_set(headers) sorted_headers_array = ArrayClient.asc_sort(headers_array) for key in sorted_headers_array: lower_key = StringClient.to_lower(key) if StringClient.has_prefix(lower_key, 'x-oss-') or StringClient.equals(lower_key, 'content-type') or StringClient.equals(lower_key, 'content-md5'): canonicalized_headers = f'{canonicalized_headers}{key}:{StringClient.trim(headers.get(key))}\n' return canonicalized_headers def get_signature_v1( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], secret: str, ) -> str: resource = '' string_to_sign = '' if not UtilClient.empty(bucket_name): resource = f'/{bucket_name}' resource = f'{resource}{pathname}' canonicalized_resource = self.build_canonicalized_resource(resource, query) canonicalized_headers = self.build_canonicalized_headers(headers) string_to_sign = f'{method}\n{canonicalized_headers}{canonicalized_resource}' return Encoder.base_64encode_to_string(Signer.hmac_sha1sign(string_to_sign, secret)) async def get_signature_v1_async( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], secret: str, ) -> str: resource = '' string_to_sign = '' if not UtilClient.empty(bucket_name): resource = f'/{bucket_name}' resource = f'{resource}{pathname}' canonicalized_resource = await self.build_canonicalized_resource_async(resource, query) canonicalized_headers = await self.build_canonicalized_headers_async(headers) string_to_sign = f'{method}\n{canonicalized_headers}{canonicalized_resource}' return Encoder.base_64encode_to_string(Signer.hmac_sha1sign(string_to_sign, secret)) def build_canonicalized_resource( self, pathname: str, query: Dict[str, str], ) -> str: canonicalized_resource = pathname query_keys = MapClient.key_set(query) sorted_params = ArrayClient.asc_sort(query_keys) separator = '?' for param_name in sorted_params: if ArrayClient.contains(self._default_signed_params, param_name) or StringClient.has_prefix(param_name, 'x-oss-'): canonicalized_resource = f'{canonicalized_resource}{separator}{param_name}' if not UtilClient.empty(query.get(param_name)): canonicalized_resource = f'{canonicalized_resource}={query.get(param_name)}' separator = '&' return canonicalized_resource async def build_canonicalized_resource_async( self, pathname: str, query: Dict[str, str], ) -> str: canonicalized_resource = pathname query_keys = MapClient.key_set(query) sorted_params = ArrayClient.asc_sort(query_keys) separator = '?' for param_name in sorted_params: if ArrayClient.contains(self._default_signed_params, param_name) or StringClient.has_prefix(param_name, 'x-oss-'): canonicalized_resource = f'{canonicalized_resource}{separator}{param_name}' if not UtilClient.empty(query.get(param_name)): canonicalized_resource = f'{canonicalized_resource}={query.get(param_name)}' separator = '&' return canonicalized_resource def build_canonicalized_headers( self, headers: Dict[str, str], ) -> str: canonicalized_headers = '' content_type = headers.get('content-type') if UtilClient.is_unset(content_type): content_type = '' content_md_5 = headers.get('content-md5') if UtilClient.is_unset(content_md_5): content_md_5 = '' canonicalized_headers = f"{canonicalized_headers}{content_md_5}\n{content_type}\n{headers.get('date')}\n" keys = MapClient.key_set(headers) sorted_headers = ArrayClient.asc_sort(keys) for header in sorted_headers: if StringClient.contains(StringClient.to_lower(header), 'x-oss-') and not UtilClient.is_unset(headers.get(header)): canonicalized_headers = f'{canonicalized_headers}{header}:{headers.get(header)}\n' return canonicalized_headers async def build_canonicalized_headers_async( self, headers: Dict[str, str], ) -> str: canonicalized_headers = '' content_type = headers.get('content-type') if UtilClient.is_unset(content_type): content_type = '' content_md_5 = headers.get('content-md5') if UtilClient.is_unset(content_md_5): content_md_5 = '' canonicalized_headers = f"{canonicalized_headers}{content_md_5}\n{content_type}\n{headers.get('date')}\n" keys = MapClient.key_set(headers) sorted_headers = ArrayClient.asc_sort(keys) for header in sorted_headers: if StringClient.contains(StringClient.to_lower(header), 'x-oss-') and not UtilClient.is_unset(headers.get(header)): canonicalized_headers = f'{canonicalized_headers}{header}:{headers.get(header)}\n' return canonicalized_headers def get_signature_v2( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], secret: str, ) -> str: return '' async def get_signature_v2_async( self, bucket_name: str, pathname: str, method: str, query: Dict[str, str], headers: Dict[str, str], secret: str, ) -> str: return ''