python/alibabacloud_tea_openapi/utils.py (416 lines of code) (raw):
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from __future__ import annotations
from darabonba.model import DaraModel
from typing import Dict, Any, List
import binascii
import datetime
import hashlib
import hmac
import base64
import copy
import platform
import time
import Tea
import threading
import random
import hashlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from urllib.parse import quote_plus, quote
from alibabacloud_tea_util.client import Client as Util
from darabonba.utils.stream import STREAM_CLASS
from datetime import datetime
from typing import Any, Dict, List
from .sm3 import hash_sm3, Sm3
_process_start_time = int(time.time() * 1000)
_seqId = 0
def to_str(val):
if val is None:
return val
if isinstance(val, bytes):
return str(val, encoding='utf-8')
else:
return str(val)
def rsa_sign(plaintext, secret):
if not secret.startswith(b'-----BEGIN RSA PRIVATE KEY-----'):
secret = b'-----BEGIN RSA PRIVATE KEY-----\n%s' % secret
if not secret.endswith(b'-----END RSA PRIVATE KEY-----'):
secret = b'%s\n-----END RSA PRIVATE KEY-----' % secret
key = load_pem_private_key(secret, password=None, backend=default_backend())
return key.sign(plaintext, padding.PKCS1v15(), hashes.SHA256())
def signature_method(secret, source, sign_type):
source = source.encode('utf-8')
secret = secret.encode('utf-8')
if sign_type == 'ACS3-HMAC-SHA256':
return hmac.new(secret, source, hashlib.sha256).digest()
elif sign_type == 'ACS3-HMAC-SM3':
return hmac.new(secret, source, Sm3).digest()
elif sign_type == 'ACS3-RSA-SHA256':
return rsa_sign(source, secret)
def get_canonical_query_string(query):
if query is None or len(query) <= 0:
return ''
canon_keys = []
for k, v in query.items():
if v is not None:
canon_keys.append(k)
canon_keys.sort()
query_string = ''
for key in canon_keys:
value = quote(query[key], safe='~', encoding='utf-8')
if value is None:
s = f'{key}&'
else:
s = f'{key}={value}&'
query_string += s
return query_string[:-1]
def get_canonicalized_headers(headers):
canon_keys = []
tmp_headers = {}
for k, v in headers.items():
if v is not None:
if k.lower() not in canon_keys:
canon_keys.append(k.lower())
tmp_headers[k.lower()] = [to_str(v).strip()]
else:
tmp_headers[k.lower()].append(to_str(v).strip())
canon_keys.sort()
canonical_headers = ''
for key in canon_keys:
header_entry = ','.join(sorted(tmp_headers[key]))
s = f'{key}:{header_entry}\n'
canonical_headers += s
return canonical_headers, ';'.join(canon_keys)
class Utils(object):
"""
This is for OpenApi Util
"""
@staticmethod
def convert(body, content):
"""
Convert all params of body other than type of readable into content
@param body: source Model
@param content: target Model
@return: void
"""
body_map = Utils._except_stream(body.to_map())
content.from_map(body_map)
@staticmethod
def _except_stream(val):
if isinstance(val, dict):
result = {}
for k, v in val.items():
result[k] = Utils._except_stream(v)
return result
elif isinstance(val, list):
result = []
for i in val:
if i is not None:
item = Utils._except_stream(i)
if item is not None:
result.append(item)
else:
result.append(Utils._except_stream(i))
return result
elif isinstance(val, STREAM_CLASS):
return None
return val
@staticmethod
def _get_canonicalized_headers(headers):
canon_keys = []
for k in headers:
if k.startswith('x-acs-'):
canon_keys.append(k)
canon_keys = sorted(canon_keys)
canon_header = ''
for k in canon_keys:
canon_header += '%s:%s\n' % (k, headers[k])
return canon_header
@staticmethod
def _get_canonicalized_resource(pathname, query):
if len(query) <= 0:
return pathname
resource = '%s?' % pathname
query_list = sorted(list(query))
for key in query_list:
if query[key] is not None:
if query[key] == '':
s = '%s&' % key
else:
s = '%s=%s&' % (key, query[key])
resource += s
return resource[:-1]
@staticmethod
def get_string_to_sign(request):
"""
Get the string to be signed according to request
@param request: which contains signed messages
@return: the signed string
"""
method, pathname, headers, query = request.method, request.pathname, request.headers, request.query
accept = '' if headers.get('accept') is None else headers.get('accept')
content_md5 = '' if headers.get('content-md5') is None else headers.get('content-md5')
content_type = '' if headers.get('content-type') is None else headers.get('content-type')
date = '' if headers.get('date') is None else headers.get('date')
header = '%s\n%s\n%s\n%s\n%s\n' % (method, accept, content_md5, content_type, date)
canon_headers = Utils._get_canonicalized_headers(headers)
canon_resource = Utils._get_canonicalized_resource(pathname, query)
sign_str = header + canon_headers + canon_resource
return sign_str
@staticmethod
def get_roasignature(string_to_sign, secret):
"""
Get signature according to stringToSign, secret
@type string_to_sign: str
@param string_to_sign: the signed string
@type secret: str
@param secret: accesskey secret
@return: the signature
"""
hash_val = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1).digest()
signature = base64.b64encode(hash_val).decode('utf-8')
return signature
@staticmethod
def _object_handler(key, value, out):
if value is None:
return
if isinstance(value, dict):
for k, v in value.items():
Utils._object_handler('%s.%s' % (key, k), v, out)
elif isinstance(value, DaraModel):
for k, v in value.to_map().items():
Utils._object_handler('%s.%s' % (key, k), v, out)
elif isinstance(value, (list, tuple)):
for index, val in enumerate(value):
Utils._object_handler('%s.%s' % (key, index + 1), val, out)
else:
if key.startswith('.'):
key = key[1:]
if isinstance(value, bytes):
out[key] = str(value, encoding='utf-8')
elif not isinstance(value, STREAM_CLASS):
out[key] = str(value)
@staticmethod
def to_form(filter):
"""
Parse filter into a form string
@type filter: dict
@param filter: object
@return: the string
"""
result = {}
if filter:
Utils._object_handler('', filter, result)
return Util.to_form_string(
Util.anyify_map_value(result)
)
@staticmethod
def get_timestamp():
"""
Get timestamp
@return: the timestamp string
"""
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
@staticmethod
def query(filter):
"""
Parse filter into a object which's type is map[string]string
@type filter: dict
@param filter: query param
@return: the object
"""
out_dict = {}
if filter:
Utils._object_handler('', filter, out_dict)
return out_dict
@staticmethod
def get_rpcsignature(signed_params, method, secret):
"""
Get signature according to signedParams, method and secret
@type signed_params: dict
@param signed_params: params which need to be signed
@type method: str
@param method: http method e.g. GET
@type secret: str
@param secret: AccessKeySecret
@return: the signature
"""
queries = signed_params.copy()
keys = list(queries.keys())
keys.sort()
canonicalized_query_string = ""
for k in keys:
if queries[k] is not None:
canonicalized_query_string += f'&{quote(k, safe="~", encoding="utf-8")}=' \
f'{quote(queries[k], safe="~", encoding="utf-8")}'
string_to_sign = f'{method}&%2F&{quote_plus(canonicalized_query_string[1:], safe="~", encoding="utf-8")}'
digest_maker = hmac.new(bytes(secret + '&', encoding="utf-8"),
bytes(string_to_sign, encoding="utf-8"),
digestmod=hashlib.sha1)
hash_bytes = digest_maker.digest()
signed_str = str(base64.b64encode(hash_bytes), encoding="utf-8")
return signed_str
@staticmethod
def array_to_string_with_specified_style(array, prefix, style):
"""
Parse array into a string with specified style
@type array: any
@param array: the array
@type prefix: str
@param prefix: the prefix string
@param style: specified style e.g. repeatList
@return: the string
"""
if array is None:
return ''
if style == 'repeatList':
return Utils._flat_repeat_list({prefix: array})
elif style == 'simple':
return ','.join(map(str, array))
elif style == 'spaceDelimited':
return ' '.join(map(str, array))
elif style == 'pipeDelimited':
return '|'.join(map(str, array))
elif style == 'json':
return Util.to_jsonstring(Utils._parse_to_dict(array))
else:
return ''
@staticmethod
def _flat_repeat_list(dic):
query = {}
if dic:
Utils._object_handler('', dic, query)
l = []
q = sorted(query)
for i in q:
k = quote_plus(i, encoding='utf-8')
v = quote_plus(query[i], encoding='utf-8')
l.append(k + '=' + v)
return '&&'.join(l)
@staticmethod
def parse_to_map(inp):
"""
Transform input as map.
"""
try:
result = Utils._parse_to_dict(inp)
return copy.deepcopy(result)
except TypeError:
return
@staticmethod
def _parse_to_dict(val):
if isinstance(val, dict):
result = {}
for k, v in val.items():
if isinstance(v, (list, dict, DaraModel)):
result[k] = Utils._parse_to_dict(v)
else:
result[k] = v
return result
elif isinstance(val, list):
result = []
for i in val:
if isinstance(i, (list, dict, DaraModel)):
result.append(Utils._parse_to_dict(i))
else:
result.append(i)
return result
elif isinstance(val, DaraModel):
return val.to_map()
@staticmethod
def get_endpoint(endpoint, server_use, endpoint_type):
"""
If endpointType is internal, use internal endpoint
If serverUse is true and endpointType is accelerate, use accelerate endpoint
Default return endpoint
@param server_use whether use accelerate endpoint
@param endpoint_type value must be internal or accelerate
@return the final endpoint
"""
if endpoint_type == "internal":
str_split = endpoint.split('.')
str_split[0] += "-internal"
endpoint = ".".join(str_split)
if server_use and endpoint_type == "accelerate":
return "oss-accelerate.aliyuncs.com"
return endpoint
@staticmethod
def hash(raw, sign_type):
if sign_type == 'ACS3-HMAC-SHA256' or sign_type == 'ACS3-RSA-SHA256':
return hashlib.sha256(raw).digest()
elif sign_type == 'ACS3-HMAC-SM3':
return hash_sm3(raw)
@staticmethod
def hex_encode(raw):
if raw:
return binascii.b2a_hex(raw).decode('utf-8')
@staticmethod
def get_authorization(request, sign_type, payload, ak, secret):
canonical_uri = request.pathname if request.pathname else '/'
canonicalized_query = get_canonical_query_string(request.query)
canonicalized_headers, signed_headers = get_canonicalized_headers(request.headers)
canonical_request = f'{request.method}\n' \
f'{canonical_uri}\n' \
f'{canonicalized_query}\n' \
f'{canonicalized_headers}\n' \
f'{signed_headers}\n' \
f'{payload}'
str_to_sign = f'{sign_type}\n{Utils.hex_encode(Utils.hash(canonical_request.encode("utf-8"), sign_type))}'
signature = Utils.hex_encode(signature_method(secret, str_to_sign, sign_type))
auth = f'{sign_type} Credential={ak},SignedHeaders={signed_headers},Signature={signature}'
return auth
@staticmethod
def get_encode_path(path):
return quote(path, safe='/~', encoding="utf-8")
@staticmethod
def get_encode_param(param):
return quote(param, safe='~', encoding="utf-8")
@staticmethod
def get_nonce() -> str:
"""
Generate a nonce string
@return: the nonce string
"""
global _seqId
thread_id = threading.get_ident()
current_time = int(time.time() * 1000)
seq = _seqId
_seqId += 1
randNum = random.getrandbits(64)
_process_start_time = int(time.time() * 1000)
msg = f'{_process_start_time}-{thread_id}-{current_time}-{seq}-{randNum}'
md5 = hashlib.md5()
md5.update(msg.encode('utf-8'))
return md5.hexdigest()
@staticmethod
def get_date_utcstring() -> str:
"""
Get an UTC format string by current date, e.g. 'Thu, 06 Feb 2020 07:32:54 GMT'
@return: the UTC format string
"""
return datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
@staticmethod
def stringify_map_value(
m: Dict[str, Any],
) -> Dict[str, str]:
"""
Stringify the value of map
@return: the new stringified map
"""
if m is None:
return {}
dic_result = {}
for k, v in m.items():
if v is not None:
if isinstance(v, bytes):
v = v.decode('utf-8')
else:
v = str(v)
dic_result[k] = v
return dic_result
@staticmethod
def to_array(
input: Any,
) -> List[Dict[str, Any]]:
"""
Transform input as array.
"""
if input is None:
return []
out = []
for i in input:
if isinstance(i, DaraModel):
out.append(i.to_map())
else:
out.append(i)
return out
@staticmethod
def __get_default_agent():
return f'AlibabaCloud ({platform.system()}; {platform.machine()}) ' \
f'Python/{platform.python_version()} Core/{Tea.__version__} TeaDSL/2'
@staticmethod
def get_user_agent(
user_agent: str,
) -> str:
"""
Get user agent, if it userAgent is not null, splice it with defaultUserAgent and return, otherwise return defaultUserAgent
@return: the string value
"""
if user_agent:
return f'{Utils.__get_default_agent()} {user_agent}'
return Utils.__get_default_agent()
@staticmethod
def get_endpoint_rules(product, region_id, endpoint_type, network, suffix=None):
product = product or ""
network = network or ""
if endpoint_type == "regional":
if region_id is None or region_id == "":
raise RuntimeError(
"RegionId is empty, please set a valid RegionId")
result = "<product><network>.<region_id>.aliyuncs.com".replace(
"<region_id>", region_id)
else:
result = "<product><network>.aliyuncs.com"
result = result.replace("<product>", product.lower())
if network == "" or network == "public":
result = result.replace("<network>", "")
else:
result = result.replace("<network>", "-"+network)
return result
@staticmethod
def get_throttling_time_left(headers: Dict[str, str]) -> int:
"""
Get throttling time left based on the response headers
@param headers: The response headers
@return: Remaining time in milliseconds before the throttle is lifted
"""
rate_limit_user_api = headers.get("x-ratelimit-user-api")
rate_limit_user = headers.get("x-ratelimit-user")
time_left_user_api = Utils._get_time_left(rate_limit_user_api)
time_left_user = Utils._get_time_left(rate_limit_user)
return max(time_left_user_api, time_left_user)
@staticmethod
def _get_time_left(rate_limit: str) -> int:
"""
Extract time left from rate limit string
@param rate_limit: Rate limit string from headers
@return: Time left in milliseconds
"""
if rate_limit:
pairs = rate_limit.split(',')
for pair in pairs:
key, value = pair.split(':')
if key.strip() == 'TimeLeft':
return int(value.strip())
return 0
@staticmethod
def flat_map(params: Dict[str, Any], prefix: str = '') -> Dict[str, str]:
"""
Flatten the dictionary with a given prefix
@param params: Dictionary to flatten
@param prefix: Prefix for keys in the flattened dictionary
@return: A flattened dictionary
"""
flat_result = {}
def _flatten(current_params, current_prefix):
if isinstance(current_params, dict):
for k, v in current_params.items():
new_key = f"{current_prefix}.{k}" if current_prefix else k
_flatten(v, new_key)
elif isinstance(current_params, list):
for index, item in enumerate(current_params):
new_key = f"{current_prefix}.{index + 1}"
_flatten(item, new_key)
else:
flat_result[current_prefix] = str(current_params)
_flatten(params, prefix)
return flat_result
@staticmethod
def map_to_flat_style(input: Any) -> Any:
"""
Convert input to a flat style
@param input: Input to convert
@return: A flat representation of the input
"""
if isinstance(input, dict):
return Utils.flat_map(input)
elif isinstance(input, list):
flat_result = {}
for index, item in enumerate(input):
flat_result[index + 1] = str(item)
return flat_result
else:
return str(input)