alibabacloud-gateway-pds/python2/alibabacloud_gateway_pds/client.py (158 lines of code) (raw):
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from __future__ import unicode_literals
from Tea.exceptions import TeaException
from alibabacloud_darabonba_signature_util.signer import Signer
from alibabacloud_darabonba_encode_util.encoder import Encoder
from Tea.core import TeaCore
from Tea.converter import TeaConverter
from alibabacloud_gateway_spi.client import Client as SPIClient
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
from alibabacloud_darabonba_string.client import Client as StringClient
from alibabacloud_darabonba_map.client import Client as MapClient
from alibabacloud_darabonba_array.client import Client as ArrayClient
class Client(SPIClient):
def __init__(self):
super(Client, self).__init__()
def modify_configuration(self, context, attribute_map):
config = context.configuration
config.endpoint = self.get_endpoint(config.network, config.endpoint)
def modify_request(self, context, attribute_map):
request = context.request
config = context.configuration
request.headers = TeaCore.merge({
'date': UtilClient.get_date_utcstring(),
'host': config.endpoint,
'x-acs-version': request.version,
'x-acs-action': request.action,
'user-agent': request.user_agent,
'x-acs-signature-nonce': UtilClient.get_nonce(),
'x-acs-signature-method': 'HMAC-SHA1',
'x-acs-signature-version': '1.0',
'accept': 'application/json'
}, request.headers)
if not UtilClient.is_unset(request.stream):
tmp = UtilClient.read_as_bytes(request.stream)
request.stream = tmp
request.headers['content-type'] = 'application/octet-stream'
else:
if not UtilClient.is_unset(request.body):
if UtilClient.equal_string(request.req_body_type, 'json'):
json_obj = UtilClient.to_jsonstring(request.body)
request.stream = json_obj
request.headers['content-type'] = 'application/json; charset=utf-8'
else:
m = UtilClient.assert_as_map(request.body)
form_obj = OpenApiUtilClient.to_form(m)
request.stream = form_obj
request.headers['content-type'] = 'application/x-www-form-urlencoded'
if not UtilClient.equal_string(request.auth_type, 'Anonymous') and not UtilClient.is_unset(request.credential):
credential = request.credential
auth_type = credential.get_type()
if UtilClient.equal_string(auth_type, 'bearer'):
bearer_token = credential.get_bearer_token()
request.headers['x-acs-bearer-token'] = bearer_token
request.headers['Authorization'] = 'Bearer %s' % TeaConverter.to_unicode(bearer_token)
else:
access_key_id = credential.get_access_key_id()
access_key_secret = credential.get_access_key_secret()
security_token = credential.get_security_token()
if not UtilClient.empty(security_token):
request.headers['x-acs-security-token'] = security_token
request.headers['Authorization'] = self.get_authorization(request.pathname, request.method, request.query, request.headers, access_key_id, access_key_secret)
def modify_response(self, context, attribute_map):
request = context.request
response = context.response
if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code):
_res = UtilClient.read_as_json(response.body)
err = UtilClient.assert_as_map(_res)
headers = response.headers
request_id = headers.get('x-ca-request-id')
err['statusCode'] = response.status_code
raise TeaException({
'code': '%s' % TeaConverter.to_unicode(self.default_any(err.get('Code'), err.get('code'))),
'message': 'code: %s, %s request id: %s' % (TeaConverter.to_unicode(response.status_code), TeaConverter.to_unicode(self.default_any(err.get('Message'), err.get('message'))), TeaConverter.to_unicode(request_id)),
'data': err
})
if not UtilClient.is_unset(response.body):
if UtilClient.equal_number(response.status_code, 204):
UtilClient.read_as_string(response.body)
elif UtilClient.equal_string(request.body_type, 'binary'):
response.deserialized_body = response.body
elif UtilClient.equal_string(request.body_type, 'byte'):
byt = UtilClient.read_as_bytes(response.body)
response.deserialized_body = byt
elif UtilClient.equal_string(request.body_type, 'string'):
str = UtilClient.read_as_string(response.body)
response.deserialized_body = str
elif UtilClient.equal_string(request.body_type, 'json'):
response.deserialized_body = UtilClient.read_as_json(response.body)
elif UtilClient.equal_string(request.body_type, 'array'):
response.deserialized_body = UtilClient.read_as_json(response.body)
else:
response.deserialized_body = UtilClient.read_as_string(response.body)
def get_endpoint(self, network, endpoint):
real_endpoint = 'api.aliyunpds.com'
if not UtilClient.empty(endpoint):
real_endpoint = endpoint
if not UtilClient.empty(network) and StringClient.equals(network, 'vpc'):
real_endpoint = StringClient.replace(real_endpoint, 'api.aliyunpds.com', 'api-vpc.aliyunpds.com', None)
real_endpoint = StringClient.replace(real_endpoint, 'admin.aliyunpds.com', 'admin-vpc.aliyunpds.com', None)
return real_endpoint
def default_any(self, input_value, default_value):
if UtilClient.is_unset(input_value):
return default_value
return input_value
def get_authorization(self, pathname, method, query, headers, ak, secret):
signature = self.get_signature(pathname, method, query, headers, secret)
return 'acs %s:%s' % (TeaConverter.to_unicode(ak), TeaConverter.to_unicode(signature))
def get_signature(self, pathname, method, query, headers, secret):
string_to_sign = ''
canonicalized_resource = self.build_canonicalized_resource(pathname, query)
canonicalized_headers = self.build_canonicalized_headers(headers)
string_to_sign = '%s\n%s%s' % (TeaConverter.to_unicode(method), TeaConverter.to_unicode(canonicalized_headers), TeaConverter.to_unicode(canonicalized_resource))
signature = Signer.hmac_sha1sign(string_to_sign, secret)
return Encoder.base_64encode_to_string(signature)
def build_canonicalized_resource(self, pathname, query):
canonicalized_resource = pathname
if not UtilClient.is_unset(query):
query_array = MapClient.key_set(query)
sorted_query_array = ArrayClient.asc_sort(query_array)
separator = '?'
for key in sorted_query_array:
canonicalized_resource = '%s%s%s' % (TeaConverter.to_unicode(canonicalized_resource), TeaConverter.to_unicode(separator), TeaConverter.to_unicode(key))
if not UtilClient.empty(query.get(key)):
canonicalized_resource = '%s=%s' % (TeaConverter.to_unicode(canonicalized_resource), TeaConverter.to_unicode(query.get(key)))
separator = '&'
return canonicalized_resource
def build_canonicalized_headers(self, headers):
accept = headers.get('accept')
if UtilClient.is_unset(accept):
accept = ''
content_md_5 = headers.get('content-md5')
if UtilClient.is_unset(content_md_5):
content_md_5 = ''
content_type = headers.get('content-type')
if UtilClient.is_unset(content_type):
content_type = ''
date = headers.get('date')
if UtilClient.is_unset(date):
date = ''
canonicalized_headers = '%s\n%s\n%s\n%s\n' % (TeaConverter.to_unicode(accept), TeaConverter.to_unicode(content_md_5), TeaConverter.to_unicode(content_type), TeaConverter.to_unicode(date))
sorted_headers = self.get_signed_headers(headers)
for header in sorted_headers:
value = headers.get(header)
value_trim = StringClient.trim(value)
canonicalized_headers = '%s%s:%s\n' % (TeaConverter.to_unicode(canonicalized_headers), TeaConverter.to_unicode(header), TeaConverter.to_unicode(value_trim))
return canonicalized_headers
def get_signed_headers(self, headers):
headers_array = MapClient.key_set(headers)
sorted_headers_array = ArrayClient.asc_sort(headers_array)
tmp = ''
separator = ''
for key in sorted_headers_array:
lower_key = StringClient.to_lower(key)
if StringClient.has_prefix(lower_key, 'x-acs-'):
if not StringClient.contains(tmp, lower_key):
tmp = '%s%s%s' % (TeaConverter.to_unicode(tmp), TeaConverter.to_unicode(separator), TeaConverter.to_unicode(lower_key))
separator = ';'
return StringClient.split(tmp, ';', None)