# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from __future__ import annotations
from darabonba.model import DaraModel
from typing import Dict, Any, List
import binascii
import datetime
import hashlib
import hmac
import base64
import copy
import platform
import time
import Tea
import threading
import random
import hashlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from urllib.parse import quote_plus, quote
from alibabacloud_tea_util.client import Client as Util
from darabonba.utils.stream import STREAM_CLASS
from datetime import datetime
from typing import Any, Dict, List
from .sm3 import hash_sm3, Sm3

_process_start_time = int(time.time() * 1000)
_seqId = 0

def to_str(val):
    if val is None:
        return val

    if isinstance(val, bytes):
        return str(val, encoding='utf-8')
    else:
        return str(val)


def rsa_sign(plaintext, secret):
    if not secret.startswith(b'-----BEGIN RSA PRIVATE KEY-----'):
        secret = b'-----BEGIN RSA PRIVATE KEY-----\n%s' % secret
    if not secret.endswith(b'-----END RSA PRIVATE KEY-----'):
        secret = b'%s\n-----END RSA PRIVATE KEY-----' % secret

    key = load_pem_private_key(secret, password=None, backend=default_backend())
    return key.sign(plaintext, padding.PKCS1v15(), hashes.SHA256())


def signature_method(secret, source, sign_type):
    source = source.encode('utf-8')
    secret = secret.encode('utf-8')
    if sign_type == 'ACS3-HMAC-SHA256':
        return hmac.new(secret, source, hashlib.sha256).digest()
    elif sign_type == 'ACS3-HMAC-SM3':
        return hmac.new(secret, source, Sm3).digest()
    elif sign_type == 'ACS3-RSA-SHA256':
        return rsa_sign(source, secret)


def get_canonical_query_string(query):
    if query is None or len(query) <= 0:
        return ''
    canon_keys = []
    for k, v in query.items():
        if v is not None:
            canon_keys.append(k)

    canon_keys.sort()
    query_string = ''
    for key in canon_keys:
        value = quote(query[key], safe='~', encoding='utf-8')
        if value is None:
            s = f'{key}&'
        else:
            s = f'{key}={value}&'
        query_string += s
    return query_string[:-1]


def get_canonicalized_headers(headers):
    canon_keys = []
    tmp_headers = {}
    for k, v in headers.items():
        if v is not None:
            if k.lower() not in canon_keys:
                canon_keys.append(k.lower())
                tmp_headers[k.lower()] = [to_str(v).strip()]
            else:
                tmp_headers[k.lower()].append(to_str(v).strip())

    canon_keys.sort()
    canonical_headers = ''
    for key in canon_keys:
        header_entry = ','.join(sorted(tmp_headers[key]))
        s = f'{key}:{header_entry}\n'
        canonical_headers += s
    return canonical_headers, ';'.join(canon_keys)


class Utils(object):
    """
    This is for OpenApi Util
    """

    @staticmethod
    def convert(body, content):
        """
        Convert all params of body other than type of readable into content

        @param body: source Model

        @param content: target Model

        @return: void
        """
        body_map = Utils._except_stream(body.to_map())
        content.from_map(body_map)

    @staticmethod
    def _except_stream(val):
        if isinstance(val, dict):
            result = {}
            for k, v in val.items():
                result[k] = Utils._except_stream(v)
            return result
        elif isinstance(val, list):
            result = []
            for i in val:
                if i is not None:
                    item = Utils._except_stream(i)
                    if item is not None:
                        result.append(item)
                else:
                    result.append(Utils._except_stream(i))
            return result
        elif isinstance(val, STREAM_CLASS):
            return None
        return val

    @staticmethod
    def _get_canonicalized_headers(headers):
        canon_keys = []
        for k in headers:
            if k.startswith('x-acs-'):
                canon_keys.append(k)
        canon_keys = sorted(canon_keys)
        canon_header = ''
        for k in canon_keys:
            canon_header += '%s:%s\n' % (k, headers[k])
        return canon_header

    @staticmethod
    def _get_canonicalized_resource(pathname, query):
        if len(query) <= 0:
            return pathname
        resource = '%s?' % pathname
        query_list = sorted(list(query))
        for key in query_list:
            if query[key] is not None:
                if query[key] == '':
                    s = '%s&' % key
                else:
                    s = '%s=%s&' % (key, query[key])
                resource += s
        return resource[:-1]

    @staticmethod
    def get_string_to_sign(request):
        """
        Get the string to be signed according to request

        @param request:  which contains signed messages

        @return: the signed string
        """
        method, pathname, headers, query = request.method, request.pathname, request.headers, request.query

        accept = '' if headers.get('accept') is None else headers.get('accept')
        content_md5 = '' if headers.get('content-md5') is None else headers.get('content-md5')
        content_type = '' if headers.get('content-type') is None else headers.get('content-type')
        date = '' if headers.get('date') is None else headers.get('date')

        header = '%s\n%s\n%s\n%s\n%s\n' % (method, accept, content_md5, content_type, date)
        canon_headers = Utils._get_canonicalized_headers(headers)
        canon_resource = Utils._get_canonicalized_resource(pathname, query)
        sign_str = header + canon_headers + canon_resource
        return sign_str

    @staticmethod
    def get_roasignature(string_to_sign, secret):
        """
        Get signature according to stringToSign, secret

        @type string_to_sign: str
        @param string_to_sign:  the signed string

        @type secret: str
        @param secret: accesskey secret

        @return: the signature
        """
        hash_val = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1).digest()
        signature = base64.b64encode(hash_val).decode('utf-8')
        return signature

    @staticmethod
    def _object_handler(key, value, out):
        if value is None:
            return

        if isinstance(value, dict):
            for k, v in value.items():
                Utils._object_handler('%s.%s' % (key, k), v, out)
        elif isinstance(value, DaraModel):
            for k, v in value.to_map().items():
                Utils._object_handler('%s.%s' % (key, k), v, out)
        elif isinstance(value, (list, tuple)):
            for index, val in enumerate(value):
                Utils._object_handler('%s.%s' % (key, index + 1), val, out)
        else:
            if key.startswith('.'):
                key = key[1:]
            if isinstance(value, bytes):
                out[key] = str(value, encoding='utf-8')
            elif not isinstance(value, STREAM_CLASS):
                out[key] = str(value)

    @staticmethod
    def to_form(filter):
        """
        Parse filter into a form string

        @type filter: dict
        @param filter: object

        @return: the string
        """
        result = {}
        if filter:
            Utils._object_handler('', filter, result)
        return Util.to_form_string(
            Util.anyify_map_value(result)
        )

    @staticmethod
    def get_timestamp():
        """
        Get timestamp

        @return: the timestamp string
        """
        return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

    @staticmethod
    def query(filter):
        """
        Parse filter into a object which's type is map[string]string

        @type filter: dict
        @param filter: query param

        @return: the object
        """
        out_dict = {}
        if filter:
            Utils._object_handler('', filter, out_dict)
        return out_dict

    @staticmethod
    def get_rpcsignature(signed_params, method, secret):
        """
        Get signature according to signedParams, method and secret

        @type signed_params: dict
        @param signed_params: params which need to be signed

        @type method: str
        @param method: http method e.g. GET

        @type secret: str
        @param secret: AccessKeySecret

        @return: the signature
        """
        queries = signed_params.copy()
        keys = list(queries.keys())
        keys.sort()

        canonicalized_query_string = ""

        for k in keys:
            if queries[k] is not None:
                canonicalized_query_string += f'&{quote(k, safe="~", encoding="utf-8")}=' \
                                              f'{quote(queries[k], safe="~", encoding="utf-8")}'

        string_to_sign = f'{method}&%2F&{quote_plus(canonicalized_query_string[1:], safe="~", encoding="utf-8")}'

        digest_maker = hmac.new(bytes(secret + '&', encoding="utf-8"),
                                bytes(string_to_sign, encoding="utf-8"),
                                digestmod=hashlib.sha1)
        hash_bytes = digest_maker.digest()
        signed_str = str(base64.b64encode(hash_bytes), encoding="utf-8")

        return signed_str

    @staticmethod
    def array_to_string_with_specified_style(array, prefix, style):
        """
        Parse array into a string with specified style

        @type array: any
        @param array: the array

        @type prefix: str
        @param prefix: the prefix string

        @param style: specified style e.g. repeatList

        @return: the string
        """
        if array is None:
            return ''

        if style == 'repeatList':
            return Utils._flat_repeat_list({prefix: array})
        elif style == 'simple':
            return ','.join(map(str, array))
        elif style == 'spaceDelimited':
            return ' '.join(map(str, array))
        elif style == 'pipeDelimited':
            return '|'.join(map(str, array))
        elif style == 'json':
            return Util.to_jsonstring(Utils._parse_to_dict(array))
        else:
            return ''

    @staticmethod
    def _flat_repeat_list(dic):
        query = {}
        if dic:
            Utils._object_handler('', dic, query)

        l = []
        q = sorted(query)
        for i in q:
            k = quote_plus(i, encoding='utf-8')
            v = quote_plus(query[i], encoding='utf-8')
            l.append(k + '=' + v)
        return '&&'.join(l)

    @staticmethod
    def parse_to_map(inp):
        """
        Transform input as map.
        """
        try:
            result = Utils._parse_to_dict(inp)
            return copy.deepcopy(result)
        except TypeError:
            return

    @staticmethod
    def _parse_to_dict(val):
        if isinstance(val, dict):
            result = {}
            for k, v in val.items():
                if isinstance(v, (list, dict, DaraModel)):
                    result[k] = Utils._parse_to_dict(v)
                else:
                    result[k] = v
            return result
        elif isinstance(val, list):
            result = []
            for i in val:
                if isinstance(i, (list, dict, DaraModel)):
                    result.append(Utils._parse_to_dict(i))
                else:
                    result.append(i)
            return result
        elif isinstance(val, DaraModel):
            return val.to_map()

    @staticmethod
    def get_endpoint(endpoint, server_use, endpoint_type):
        """
        If endpointType is internal, use internal endpoint
        If serverUse is true and endpointType is accelerate, use accelerate endpoint
        Default return endpoint
        @param server_use whether use accelerate endpoint
        @param endpoint_type value must be internal or accelerate
        @return the final endpoint
        """
        if endpoint_type == "internal":
            str_split = endpoint.split('.')
            str_split[0] += "-internal"
            endpoint = ".".join(str_split)

        if server_use and endpoint_type == "accelerate":
            return "oss-accelerate.aliyuncs.com"

        return endpoint

    @staticmethod
    def hash(raw, sign_type):
        if sign_type == 'ACS3-HMAC-SHA256' or sign_type == 'ACS3-RSA-SHA256':
            return hashlib.sha256(raw).digest()
        elif sign_type == 'ACS3-HMAC-SM3':
            return hash_sm3(raw)

    @staticmethod
    def hex_encode(raw):
        if raw:
            return binascii.b2a_hex(raw).decode('utf-8')

    @staticmethod
    def get_authorization(request, sign_type, payload, ak, secret):
        canonical_uri = request.pathname if request.pathname else '/'
        canonicalized_query = get_canonical_query_string(request.query)
        canonicalized_headers, signed_headers = get_canonicalized_headers(request.headers)

        canonical_request = f'{request.method}\n' \
                            f'{canonical_uri}\n' \
                            f'{canonicalized_query}\n' \
                            f'{canonicalized_headers}\n' \
                            f'{signed_headers}\n' \
                            f'{payload}'

        str_to_sign = f'{sign_type}\n{Utils.hex_encode(Utils.hash(canonical_request.encode("utf-8"), sign_type))}'
        signature = Utils.hex_encode(signature_method(secret, str_to_sign, sign_type))
        auth = f'{sign_type} Credential={ak},SignedHeaders={signed_headers},Signature={signature}'
        return auth

    @staticmethod
    def get_encode_path(path):
        return quote(path, safe='/~', encoding="utf-8")

    @staticmethod
    def get_encode_param(param):
        return quote(param, safe='~', encoding="utf-8")


    @staticmethod
    def get_nonce() -> str:
        """
        Generate a nonce string
        @return: the nonce string
        """
        global _seqId
        thread_id = threading.get_ident()
        current_time = int(time.time() * 1000)
        seq = _seqId
        _seqId += 1
        randNum = random.getrandbits(64)
        _process_start_time = int(time.time() * 1000)
        msg = f'{_process_start_time}-{thread_id}-{current_time}-{seq}-{randNum}'
        md5 = hashlib.md5()
        md5.update(msg.encode('utf-8'))
        return md5.hexdigest()

    @staticmethod
    def get_date_utcstring() -> str:
        """
        Get an UTC format string by current date, e.g. 'Thu, 06 Feb 2020 07:32:54 GMT'
        @return: the UTC format string
        """
        return datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')

    @staticmethod
    def stringify_map_value(
        m: Dict[str, Any],
    ) -> Dict[str, str]:
        """
        Stringify the value of map
        @return: the new stringified map
        """
        if m is None:
            return {}

        dic_result = {}
        for k, v in m.items():
            if v is not None:
                if isinstance(v, bytes):
                    v = v.decode('utf-8')
                else:
                    v = str(v)
            dic_result[k] = v
        return dic_result

    @staticmethod
    def to_array(
        input: Any,
    ) -> List[Dict[str, Any]]:
        """
        Transform input as array.
        """
        if input is None:
            return []

        out = []
        for i in input:
            if isinstance(i, DaraModel):
                out.append(i.to_map())
            else:
                out.append(i)
        return out

    @staticmethod
    def __get_default_agent():
        return f'AlibabaCloud ({platform.system()}; {platform.machine()}) ' \
               f'Python/{platform.python_version()} Core/{Tea.__version__} TeaDSL/2'
               
    @staticmethod
    def get_user_agent(
        user_agent: str,
    ) -> str:
        """
        Get user agent, if it userAgent is not null, splice it with defaultUserAgent and return, otherwise return defaultUserAgent
        @return: the string value
        """
        if user_agent:
            return f'{Utils.__get_default_agent()} {user_agent}'
        return Utils.__get_default_agent()

    @staticmethod
    def get_endpoint_rules(product, region_id, endpoint_type, network, suffix=None):
        product = product or ""
        network = network or ""
        if endpoint_type == "regional":
            if region_id is None or region_id == "":
                raise RuntimeError(
                    "RegionId is empty, please set a valid RegionId")
            result = "<product><network>.<region_id>.aliyuncs.com".replace(
                "<region_id>", region_id)
        else:
            result = "<product><network>.aliyuncs.com"

        result = result.replace("<product>", product.lower())
        if network == "" or network == "public":
            result = result.replace("<network>", "")
        else:
            result = result.replace("<network>", "-"+network)
        return result
    
    
    @staticmethod
    def get_throttling_time_left(headers: Dict[str, str]) -> int:
        """
        Get throttling time left based on the response headers
        
        @param headers: The response headers
        @return: Remaining time in milliseconds before the throttle is lifted
        """
        rate_limit_user_api = headers.get("x-ratelimit-user-api")
        rate_limit_user = headers.get("x-ratelimit-user")

        time_left_user_api = Utils._get_time_left(rate_limit_user_api)
        time_left_user = Utils._get_time_left(rate_limit_user)

        return max(time_left_user_api, time_left_user)

    @staticmethod
    def _get_time_left(rate_limit: str) -> int:
        """
        Extract time left from rate limit string
        
        @param rate_limit: Rate limit string from headers
        @return: Time left in milliseconds
        """
        if rate_limit:
            pairs = rate_limit.split(',')
            for pair in pairs:
                key, value = pair.split(':')
                if key.strip() == 'TimeLeft':
                    return int(value.strip())
        return 0

    @staticmethod
    def flat_map(params: Dict[str, Any], prefix: str = '') -> Dict[str, str]:
        """
        Flatten the dictionary with a given prefix
        
        @param params: Dictionary to flatten
        @param prefix: Prefix for keys in the flattened dictionary
        @return: A flattened dictionary
        """
        flat_result = {}

        def _flatten(current_params, current_prefix):
            if isinstance(current_params, dict):
                for k, v in current_params.items():
                    new_key = f"{current_prefix}.{k}" if current_prefix else k
                    _flatten(v, new_key)
            elif isinstance(current_params, list):
                for index, item in enumerate(current_params):
                    new_key = f"{current_prefix}.{index + 1}"
                    _flatten(item, new_key)
            else:
                flat_result[current_prefix] = str(current_params)

        _flatten(params, prefix)
        return flat_result

    @staticmethod
    def map_to_flat_style(input: Any) -> Any:
        """
        Convert input to a flat style
        
        @param input: Input to convert
        @return: A flat representation of the input
        """
        if isinstance(input, dict):
            return Utils.flat_map(input)
        elif isinstance(input, list):
            flat_result = {}
            for index, item in enumerate(input):
                flat_result[index + 1] = str(item)
            return flat_result
        else:
            return str(input)