# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from Tea.exceptions import TeaException
from alibabacloud_darabonba_encode_util.encoder import Encoder
from alibabacloud_darabonba_signature_util.signer import Signer
from typing import Dict, Any, List
from Tea.core import TeaCore

from alibabacloud_gateway_spi.client import Client as SPIClient
from alibabacloud_gateway_spi import models as spi_models
from alibabacloud_darabonba_string.client import Client as StringClient
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_endpoint_util.client import Client as EndpointUtilClient
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
from alibabacloud_darabonba_array.client import Client as ArrayClient
from alibabacloud_darabonba_map.client import Client as MapClient
from alibabacloud_gateway_fc import models as gateway_fc_models
from alibabacloud_credentials.client import Client as CredentialClient


class Client(SPIClient):
    def __init__(self):
        super().__init__()

    def modify_configuration(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        request = context.request
        config = context.configuration
        config.endpoint = self.get_endpoint(request.product_id, config.region_id, config.endpoint_rule, config.network, config.suffix, config.endpoint_map, config.endpoint)

    async def modify_configuration_async(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        request = context.request
        config = context.configuration
        config.endpoint = self.get_endpoint(request.product_id, config.region_id, config.endpoint_rule, config.network, config.suffix, config.endpoint_map, config.endpoint)

    def modify_request(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        config = context.configuration
        if not StringClient.has_suffix(config.endpoint, 'aliyuncs.com'):
            self.sign_request_for_fc(context)
        else:
            self.sign_request_for_pop(context)

    async def modify_request_async(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        config = context.configuration
        if not StringClient.has_suffix(config.endpoint, 'aliyuncs.com'):
            await self.sign_request_for_fc_async(context)
        else:
            await self.sign_request_for_pop_async(context)

    def modify_response(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        request = context.request
        config = context.configuration
        response = context.response
        if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code):
            if StringClient.has_prefix(config.endpoint, 'fc.') and StringClient.has_suffix(config.endpoint, '.aliyuncs.com'):
                pop_res = UtilClient.read_as_json(response.body)
                pop_err = UtilClient.assert_as_map(pop_res)
                raise TeaException({
                    'code': f"{self.default_any(pop_err.get('Code'), pop_err.get('code'))}",
                    'message': f"code: {response.status_code}, {self.default_any(pop_err.get('Message'), pop_err.get('message'))} request id: {self.default_any(pop_err.get('RequestID'), pop_err.get('RequestId'))}",
                    'data': pop_err
                })
            else:
                _headers = UtilClient.assert_as_map(response.headers)
                fc_res = UtilClient.read_as_json(response.body)
                fc_err = UtilClient.assert_as_map(fc_res)
                raise TeaException({
                    'code': fc_err.get('ErrorCode'),
                    'message': f"code: {response.status_code}, {fc_err.get('ErrorMessage')} request id: {_headers.get('x-fc-request-id')}",
                    'data': fc_err
                })
        if UtilClient.equal_string(request.body_type, 'binary'):
            response.deserialized_body = response.body
        elif UtilClient.equal_string(request.body_type, 'byte'):
            byt = UtilClient.read_as_bytes(response.body)
            response.deserialized_body = byt
        elif UtilClient.equal_string(request.body_type, 'string'):
            str = UtilClient.read_as_string(response.body)
            response.deserialized_body = str
        elif UtilClient.equal_string(request.body_type, 'json'):
            obj = UtilClient.read_as_json(response.body)
            res = UtilClient.assert_as_map(obj)
            response.deserialized_body = res
        elif UtilClient.equal_string(request.body_type, 'array'):
            arr = UtilClient.read_as_json(response.body)
            response.deserialized_body = arr
        else:
            response.deserialized_body = UtilClient.read_as_string(response.body)

    async def modify_response_async(
        self,
        context: spi_models.InterceptorContext,
        attribute_map: spi_models.AttributeMap,
    ) -> None:
        request = context.request
        config = context.configuration
        response = context.response
        if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code):
            if StringClient.has_prefix(config.endpoint, 'fc.') and StringClient.has_suffix(config.endpoint, '.aliyuncs.com'):
                pop_res = await UtilClient.read_as_json_async(response.body)
                pop_err = UtilClient.assert_as_map(pop_res)
                raise TeaException({
                    'code': f"{self.default_any(pop_err.get('Code'), pop_err.get('code'))}",
                    'message': f"code: {response.status_code}, {self.default_any(pop_err.get('Message'), pop_err.get('message'))} request id: {self.default_any(pop_err.get('RequestID'), pop_err.get('RequestId'))}",
                    'data': pop_err
                })
            else:
                _headers = UtilClient.assert_as_map(response.headers)
                fc_res = await UtilClient.read_as_json_async(response.body)
                fc_err = UtilClient.assert_as_map(fc_res)
                raise TeaException({
                    'code': fc_err.get('ErrorCode'),
                    'message': f"code: {response.status_code}, {fc_err.get('ErrorMessage')} request id: {_headers.get('x-fc-request-id')}",
                    'data': fc_err
                })
        if UtilClient.equal_string(request.body_type, 'binary'):
            response.deserialized_body = response.body
        elif UtilClient.equal_string(request.body_type, 'byte'):
            byt = await UtilClient.read_as_bytes_async(response.body)
            response.deserialized_body = byt
        elif UtilClient.equal_string(request.body_type, 'string'):
            str = await UtilClient.read_as_string_async(response.body)
            response.deserialized_body = str
        elif UtilClient.equal_string(request.body_type, 'json'):
            obj = await UtilClient.read_as_json_async(response.body)
            res = UtilClient.assert_as_map(obj)
            response.deserialized_body = res
        elif UtilClient.equal_string(request.body_type, 'array'):
            arr = await UtilClient.read_as_json_async(response.body)
            response.deserialized_body = arr
        else:
            response.deserialized_body = await UtilClient.read_as_string_async(response.body)

    def get_endpoint(
        self,
        product_id: str,
        region_id: str,
        endpoint_rule: str,
        network: str,
        suffix: str,
        endpoint_map: Dict[str, str],
        endpoint: str,
    ) -> str:
        if not UtilClient.empty(endpoint):
            return endpoint
        if not UtilClient.is_unset(endpoint_map) and not UtilClient.empty(endpoint_map.get(region_id)):
            return endpoint_map.get(region_id)
        return EndpointUtilClient.get_endpoint_rules(product_id, region_id, endpoint_rule, network, suffix)

    def default_any(
        self,
        input_value: Any,
        default_value: Any,
    ) -> Any:
        if UtilClient.is_unset(input_value):
            return default_value
        return input_value

    def sign_request_for_fc(
        self,
        context: spi_models.InterceptorContext,
    ) -> None:
        request = context.request
        config = context.configuration
        request.headers = TeaCore.merge({
            'host': config.endpoint,
            'date': UtilClient.get_date_utcstring(),
            'accept': 'application/json',
            'user-agent': request.user_agent
        }, request.headers)
        request.headers['content-type'] = 'application/json'
        if not UtilClient.is_unset(request.stream):
            tmp = UtilClient.read_as_bytes(request.stream)
            request.stream = tmp
            request.headers['content-type'] = 'application/octet-stream'
            request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign_for_bytes(tmp))
        else:
            if not UtilClient.is_unset(request.body):
                if UtilClient.equal_string(request.req_body_type, 'json'):
                    json_obj = UtilClient.to_jsonstring(request.body)
                    request.stream = json_obj
                    request.headers['content-type'] = 'application/json'
                    request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(json_obj))
                else:
                    m = UtilClient.assert_as_map(request.body)
                    form_obj = OpenApiUtilClient.to_form(m)
                    request.stream = form_obj
                    request.headers['content-type'] = 'application/x-www-form-urlencoded'
                    request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(form_obj))
        credential = request.credential
        access_key_id = credential.get_access_key_id()
        access_key_secret = credential.get_access_key_secret()
        security_token = credential.get_security_token()
        if not UtilClient.empty(security_token):
            request.headers['x-fc-security-token'] = security_token
        request.headers['Authorization'] = self.get_authorization_for_fc(request.pathname, request.method, request.query, request.headers, access_key_id, access_key_secret)

    async def sign_request_for_fc_async(
        self,
        context: spi_models.InterceptorContext,
    ) -> None:
        request = context.request
        config = context.configuration
        request.headers = TeaCore.merge({
            'host': config.endpoint,
            'date': UtilClient.get_date_utcstring(),
            'accept': 'application/json',
            'user-agent': request.user_agent
        }, request.headers)
        request.headers['content-type'] = 'application/json'
        if not UtilClient.is_unset(request.stream):
            tmp = await UtilClient.read_as_bytes_async(request.stream)
            request.stream = tmp
            request.headers['content-type'] = 'application/octet-stream'
            request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign_for_bytes(tmp))
        else:
            if not UtilClient.is_unset(request.body):
                if UtilClient.equal_string(request.req_body_type, 'json'):
                    json_obj = UtilClient.to_jsonstring(request.body)
                    request.stream = json_obj
                    request.headers['content-type'] = 'application/json'
                    request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(json_obj))
                else:
                    m = UtilClient.assert_as_map(request.body)
                    form_obj = OpenApiUtilClient.to_form(m)
                    request.stream = form_obj
                    request.headers['content-type'] = 'application/x-www-form-urlencoded'
                    request.headers['content-md5'] = Encoder.base_64encode_to_string(Signer.md5sign(form_obj))
        credential = request.credential
        access_key_id = await credential.get_access_key_id_async()
        access_key_secret = await credential.get_access_key_secret_async()
        security_token = await credential.get_security_token_async()
        if not UtilClient.empty(security_token):
            request.headers['x-fc-security-token'] = security_token
        request.headers['Authorization'] = await self.get_authorization_for_fc_async(request.pathname, request.method, request.query, request.headers, access_key_id, access_key_secret)

    def sign_request_for_pop(
        self,
        context: spi_models.InterceptorContext,
    ) -> None:
        request = context.request
        config = context.configuration
        request.headers = TeaCore.merge({
            'host': config.endpoint,
            'x-acs-version': request.version,
            'x-acs-action': request.action,
            'user-agent': request.user_agent,
            'x-acs-date': OpenApiUtilClient.get_timestamp(),
            'x-acs-signature-nonce': UtilClient.get_nonce(),
            'accept': 'application/json'
        }, request.headers)
        signature_algorithm = 'ACS3-HMAC-SHA256'
        if not UtilClient.is_unset(request.signature_algorithm):
            signature_algorithm = request.signature_algorithm
        hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(''), signature_algorithm))
        if not UtilClient.is_unset(request.stream):
            tmp = UtilClient.read_as_bytes(request.stream)
            hashed_request_payload = Encoder.hex_encode(Encoder.hash(tmp, signature_algorithm))
            request.stream = tmp
            request.headers['content-type'] = 'application/octet-stream'
        else:
            if not UtilClient.is_unset(request.body):
                if UtilClient.equal_string(request.req_body_type, 'json'):
                    json_obj = UtilClient.to_jsonstring(request.body)
                    hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(json_obj), signature_algorithm))
                    request.stream = json_obj
                    request.headers['content-type'] = 'application/json; charset=utf-8'
                else:
                    m = UtilClient.assert_as_map(request.body)
                    form_obj = OpenApiUtilClient.to_form(m)
                    hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(form_obj), signature_algorithm))
                    request.stream = form_obj
                    request.headers['content-type'] = 'application/x-www-form-urlencoded'
        request.headers['x-acs-content-sha256'] = hashed_request_payload
        if not UtilClient.equal_string(request.auth_type, 'Anonymous'):
            credential = request.credential
            access_key_id = credential.get_access_key_id()
            access_key_secret = credential.get_access_key_secret()
            security_token = credential.get_security_token()
            if not UtilClient.empty(security_token):
                request.headers['x-acs-accesskey-id'] = access_key_id
                request.headers['x-acs-security-token'] = security_token
            request.headers['Authorization'] = self.get_authorization_for_pop(request.pathname, request.method, request.query, request.headers, signature_algorithm, hashed_request_payload, access_key_id, access_key_secret)

    async def sign_request_for_pop_async(
        self,
        context: spi_models.InterceptorContext,
    ) -> None:
        request = context.request
        config = context.configuration
        request.headers = TeaCore.merge({
            'host': config.endpoint,
            'x-acs-version': request.version,
            'x-acs-action': request.action,
            'user-agent': request.user_agent,
            'x-acs-date': OpenApiUtilClient.get_timestamp(),
            'x-acs-signature-nonce': UtilClient.get_nonce(),
            'accept': 'application/json'
        }, request.headers)
        signature_algorithm = 'ACS3-HMAC-SHA256'
        if not UtilClient.is_unset(request.signature_algorithm):
            signature_algorithm = request.signature_algorithm
        hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(''), signature_algorithm))
        if not UtilClient.is_unset(request.stream):
            tmp = await UtilClient.read_as_bytes_async(request.stream)
            hashed_request_payload = Encoder.hex_encode(Encoder.hash(tmp, signature_algorithm))
            request.stream = tmp
            request.headers['content-type'] = 'application/octet-stream'
        else:
            if not UtilClient.is_unset(request.body):
                if UtilClient.equal_string(request.req_body_type, 'json'):
                    json_obj = UtilClient.to_jsonstring(request.body)
                    hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(json_obj), signature_algorithm))
                    request.stream = json_obj
                    request.headers['content-type'] = 'application/json; charset=utf-8'
                else:
                    m = UtilClient.assert_as_map(request.body)
                    form_obj = OpenApiUtilClient.to_form(m)
                    hashed_request_payload = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(form_obj), signature_algorithm))
                    request.stream = form_obj
                    request.headers['content-type'] = 'application/x-www-form-urlencoded'
        request.headers['x-acs-content-sha256'] = hashed_request_payload
        if not UtilClient.equal_string(request.auth_type, 'Anonymous'):
            credential = request.credential
            access_key_id = await credential.get_access_key_id_async()
            access_key_secret = await credential.get_access_key_secret_async()
            security_token = await credential.get_security_token_async()
            if not UtilClient.empty(security_token):
                request.headers['x-acs-accesskey-id'] = access_key_id
                request.headers['x-acs-security-token'] = security_token
            request.headers['Authorization'] = await self.get_authorization_for_pop_async(request.pathname, request.method, request.query, request.headers, signature_algorithm, hashed_request_payload, access_key_id, access_key_secret)

    def get_authorization_for_fc(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        ak: str,
        secret: str,
    ) -> str:
        sign = self.get_signature_for_fc(pathname, method, query, headers, secret)
        return f'FC {ak}:{sign}'

    async def get_authorization_for_fc_async(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        ak: str,
        secret: str,
    ) -> str:
        sign = await self.get_signature_for_fc_async(pathname, method, query, headers, secret)
        return f'FC {ak}:{sign}'

    def get_signature_for_fc(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        secret: str,
    ) -> str:
        resource = pathname
        content_md_5 = headers.get('content-md5')
        if UtilClient.is_unset(content_md_5):
            content_md_5 = ''
        content_type = headers.get('content-type')
        if UtilClient.is_unset(content_type):
            content_type = ''
        string_to_sign = ''
        canonicalized_resource = self.build_canonicalized_resource_for_fc(resource, query)
        canonicalized_headers = self.build_canonicalized_headers_for_fc(headers)
        string_to_sign = f"{method}\n{content_md_5}\n{content_type}\n{headers.get('date')}\n{canonicalized_headers}{canonicalized_resource}"
        return Encoder.base_64encode_to_string(Signer.hmac_sha256sign(string_to_sign, secret))

    async def get_signature_for_fc_async(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        secret: str,
    ) -> str:
        resource = pathname
        content_md_5 = headers.get('content-md5')
        if UtilClient.is_unset(content_md_5):
            content_md_5 = ''
        content_type = headers.get('content-type')
        if UtilClient.is_unset(content_type):
            content_type = ''
        string_to_sign = ''
        canonicalized_resource = await self.build_canonicalized_resource_for_fc_async(resource, query)
        canonicalized_headers = await self.build_canonicalized_headers_for_fc_async(headers)
        string_to_sign = f"{method}\n{content_md_5}\n{content_type}\n{headers.get('date')}\n{canonicalized_headers}{canonicalized_resource}"
        return Encoder.base_64encode_to_string(Signer.hmac_sha256sign(string_to_sign, secret))

    def build_canonicalized_resource_for_fc(
        self,
        pathname: str,
        query: Dict[str, str],
    ) -> str:
        paths = StringClient.split(pathname, f'?', 2)
        canonicalized_resource = paths[0]
        resources = {}
        if UtilClient.equal_number(ArrayClient.size(paths), 2):
            resources = StringClient.split(paths[1], '&', None)
        sub_resources = {}
        tmp = ''
        separator = ''
        if not UtilClient.is_unset(query):
            query_list = MapClient.key_set(query)
            for param_name in query_list:
                tmp = f'{tmp}{separator}{param_name}'
                if not UtilClient.is_unset(query.get(param_name)):
                    tmp = f'{tmp}={query.get(param_name)}'
                separator = ';'
            sub_resources = StringClient.split(tmp, ';', None)
        result = ArrayClient.concat(sub_resources, resources)
        sorted_params = ArrayClient.asc_sort(result)
        if UtilClient.equal_number(ArrayClient.size(sorted_params), 0):
            return f'{canonicalized_resource}\n'
        sub_res = ArrayClient.join(sorted_params, '\n')
        return f'{canonicalized_resource}\n{sub_res}'

    async def build_canonicalized_resource_for_fc_async(
        self,
        pathname: str,
        query: Dict[str, str],
    ) -> str:
        paths = StringClient.split(pathname, f'?', 2)
        canonicalized_resource = paths[0]
        resources = {}
        if UtilClient.equal_number(ArrayClient.size(paths), 2):
            resources = StringClient.split(paths[1], '&', None)
        sub_resources = {}
        tmp = ''
        separator = ''
        if not UtilClient.is_unset(query):
            query_list = MapClient.key_set(query)
            for param_name in query_list:
                tmp = f'{tmp}{separator}{param_name}'
                if not UtilClient.is_unset(query.get(param_name)):
                    tmp = f'{tmp}={query.get(param_name)}'
                separator = ';'
            sub_resources = StringClient.split(tmp, ';', None)
        result = ArrayClient.concat(sub_resources, resources)
        sorted_params = ArrayClient.asc_sort(result)
        if UtilClient.equal_number(ArrayClient.size(sorted_params), 0):
            return f'{canonicalized_resource}\n'
        sub_res = ArrayClient.join(sorted_params, '\n')
        return f'{canonicalized_resource}\n{sub_res}'

    def build_canonicalized_headers_for_fc(
        self,
        headers: Dict[str, str],
    ) -> str:
        canonicalized_headers = ''
        keys = MapClient.key_set(headers)
        sorted_headers = ArrayClient.asc_sort(keys)
        for header in sorted_headers:
            if StringClient.contains(StringClient.to_lower(header), 'x-fc-'):
                canonicalized_headers = f'{canonicalized_headers}{StringClient.to_lower(header)}:{headers.get(header)}\n'
        return canonicalized_headers

    async def build_canonicalized_headers_for_fc_async(
        self,
        headers: Dict[str, str],
    ) -> str:
        canonicalized_headers = ''
        keys = MapClient.key_set(headers)
        sorted_headers = ArrayClient.asc_sort(keys)
        for header in sorted_headers:
            if StringClient.contains(StringClient.to_lower(header), 'x-fc-'):
                canonicalized_headers = f'{canonicalized_headers}{StringClient.to_lower(header)}:{headers.get(header)}\n'
        return canonicalized_headers

    def get_authorization_for_pop(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        signature_algorithm: str,
        payload: str,
        ak: str,
        secret: str,
    ) -> str:
        signature = self.get_signature_for_pop(pathname, method, query, headers, signature_algorithm, payload, secret)
        signed_headers = self.get_signed_headers(headers)
        return f"{signature_algorithm} Credential={ak},SignedHeaders={ArrayClient.join(signed_headers, ';')},Signature={signature}"

    async def get_authorization_for_pop_async(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        signature_algorithm: str,
        payload: str,
        ak: str,
        secret: str,
    ) -> str:
        signature = await self.get_signature_for_pop_async(pathname, method, query, headers, signature_algorithm, payload, secret)
        signed_headers = await self.get_signed_headers_async(headers)
        return f"{signature_algorithm} Credential={ak},SignedHeaders={ArrayClient.join(signed_headers, ';')},Signature={signature}"

    def get_signature_for_pop(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        signature_algorithm: str,
        payload: str,
        secret: str,
    ) -> str:
        canonical_uri = '/'
        if not UtilClient.empty(pathname):
            canonical_uri = pathname
        string_to_sign = ''
        canonicalized_resource = self.build_canonicalized_resource_for_pop(query)
        canonicalized_headers = self.build_canonicalized_headers_for_pop(headers)
        signed_headers = self.get_signed_headers(headers)
        string_to_sign = f"{method}\n{canonical_uri}\n{canonicalized_resource}\n{canonicalized_headers}\n{ArrayClient.join(signed_headers, ';')}\n{payload}"
        hex = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(string_to_sign), signature_algorithm))
        string_to_sign = f'{signature_algorithm}\n{hex}'
        signature = UtilClient.to_bytes('')
        if StringClient.equals(signature_algorithm, 'ACS3-HMAC-SHA256'):
            signature = Signer.hmac_sha256sign(string_to_sign, secret)
        elif StringClient.equals(signature_algorithm, 'ACS3-HMAC-SM3'):
            signature = Signer.hmac_sm3sign(string_to_sign, secret)
        elif StringClient.equals(signature_algorithm, 'ACS3-RSA-SHA256'):
            signature = Signer.sha256with_rsasign(string_to_sign, secret)
        return Encoder.hex_encode(signature)

    async def get_signature_for_pop_async(
        self,
        pathname: str,
        method: str,
        query: Dict[str, str],
        headers: Dict[str, str],
        signature_algorithm: str,
        payload: str,
        secret: str,
    ) -> str:
        canonical_uri = '/'
        if not UtilClient.empty(pathname):
            canonical_uri = pathname
        string_to_sign = ''
        canonicalized_resource = await self.build_canonicalized_resource_for_pop_async(query)
        canonicalized_headers = await self.build_canonicalized_headers_for_pop_async(headers)
        signed_headers = await self.get_signed_headers_async(headers)
        string_to_sign = f"{method}\n{canonical_uri}\n{canonicalized_resource}\n{canonicalized_headers}\n{ArrayClient.join(signed_headers, ';')}\n{payload}"
        hex = Encoder.hex_encode(Encoder.hash(UtilClient.to_bytes(string_to_sign), signature_algorithm))
        string_to_sign = f'{signature_algorithm}\n{hex}'
        signature = UtilClient.to_bytes('')
        if StringClient.equals(signature_algorithm, 'ACS3-HMAC-SHA256'):
            signature = Signer.hmac_sha256sign(string_to_sign, secret)
        elif StringClient.equals(signature_algorithm, 'ACS3-HMAC-SM3'):
            signature = Signer.hmac_sm3sign(string_to_sign, secret)
        elif StringClient.equals(signature_algorithm, 'ACS3-RSA-SHA256'):
            signature = Signer.sha256with_rsasign(string_to_sign, secret)
        return Encoder.hex_encode(signature)

    def build_canonicalized_resource_for_pop(
        self,
        query: Dict[str, str],
    ) -> str:
        canonicalized_resource = ''
        if not UtilClient.is_unset(query):
            query_array = MapClient.key_set(query)
            sorted_query_array = ArrayClient.asc_sort(query_array)
            separator = ''
            for key in sorted_query_array:
                canonicalized_resource = f'{canonicalized_resource}{separator}{Encoder.percent_encode(key)}'
                if not UtilClient.empty(query.get(key)):
                    canonicalized_resource = f'{canonicalized_resource}={Encoder.percent_encode(query.get(key))}'
                separator = '&'
        return canonicalized_resource

    async def build_canonicalized_resource_for_pop_async(
        self,
        query: Dict[str, str],
    ) -> str:
        canonicalized_resource = ''
        if not UtilClient.is_unset(query):
            query_array = MapClient.key_set(query)
            sorted_query_array = ArrayClient.asc_sort(query_array)
            separator = ''
            for key in sorted_query_array:
                canonicalized_resource = f'{canonicalized_resource}{separator}{Encoder.percent_encode(key)}'
                if not UtilClient.empty(query.get(key)):
                    canonicalized_resource = f'{canonicalized_resource}={Encoder.percent_encode(query.get(key))}'
                separator = '&'
        return canonicalized_resource

    def build_canonicalized_headers_for_pop(
        self,
        headers: Dict[str, str],
    ) -> str:
        canonicalized_headers = ''
        sorted_headers = self.get_signed_headers(headers)
        for header in sorted_headers:
            canonicalized_headers = f'{canonicalized_headers}{header}:{StringClient.trim(headers.get(header))}\n'
        return canonicalized_headers

    async def build_canonicalized_headers_for_pop_async(
        self,
        headers: Dict[str, str],
    ) -> str:
        canonicalized_headers = ''
        sorted_headers = await self.get_signed_headers_async(headers)
        for header in sorted_headers:
            canonicalized_headers = f'{canonicalized_headers}{header}:{StringClient.trim(headers.get(header))}\n'
        return canonicalized_headers

    def get_signed_headers(
        self,
        headers: Dict[str, str],
    ) -> List[str]:
        headers_array = MapClient.key_set(headers)
        sorted_headers_array = ArrayClient.asc_sort(headers_array)
        tmp = ''
        separator = ''
        for key in sorted_headers_array:
            lower_key = StringClient.to_lower(key)
            if StringClient.has_prefix(lower_key, 'x-acs-') or StringClient.equals(lower_key, 'host') or StringClient.equals(lower_key, 'content-type'):
                if not StringClient.contains(tmp, lower_key):
                    tmp = f'{tmp}{separator}{lower_key}'
                    separator = ';'
        return StringClient.split(tmp, ';', None)

    async def get_signed_headers_async(
        self,
        headers: Dict[str, str],
    ) -> List[str]:
        headers_array = MapClient.key_set(headers)
        sorted_headers_array = ArrayClient.asc_sort(headers_array)
        tmp = ''
        separator = ''
        for key in sorted_headers_array:
            lower_key = StringClient.to_lower(key)
            if StringClient.has_prefix(lower_key, 'x-acs-') or StringClient.equals(lower_key, 'host') or StringClient.equals(lower_key, 'content-type'):
                if not StringClient.contains(tmp, lower_key):
                    tmp = f'{tmp}{separator}{lower_key}'
                    separator = ';'
        return StringClient.split(tmp, ';', None)

    def sign_request(
        self,
        request: gateway_fc_models.HttpRequest,
        credential: CredentialClient,
    ) -> Dict[str, Any]:
        http_request = gateway_fc_models.HttpRequest(
            method=request.method,
            path=request.path,
            headers=request.headers,
            body=request.body,
            req_body_type=request.req_body_type
        )
        http_request.headers['date'] = UtilClient.get_date_utcstring()
        http_request.headers['accept'] = 'application/json'
        http_request.headers['content-type'] = 'application/json'
        if not UtilClient.is_unset(request.body):
            if UtilClient.equal_string(request.req_body_type, 'json'):
                http_request.headers['content-type'] = 'application/json'
            elif UtilClient.equal_string(request.req_body_type, 'form'):
                http_request.headers['content-type'] = 'application/x-www-form-urlencoded'
            elif UtilClient.equal_string(request.req_body_type, 'binary'):
                http_request.headers['content-type'] = 'application/octet-stream'
        access_key_id = credential.get_access_key_id()
        access_key_secret = credential.get_access_key_secret()
        security_token = credential.get_security_token()
        if not UtilClient.empty(security_token):
            http_request.headers['x-fc-security-token'] = security_token
        resource = request.path
        content_md_5 = http_request.headers.get('content-md5')
        if UtilClient.is_unset(content_md_5):
            content_md_5 = ''
        content_type = http_request.headers.get('content-type')
        if UtilClient.is_unset(content_type):
            content_type = ''
        string_to_sign = ''
        canonicalized_resource = self.build_canonicalized_resource(resource)
        canonicalized_headers = self.build_canonicalized_headers(http_request.headers)
        string_to_sign = f"{request.method}\n{UtilClient.to_jsonstring(content_md_5)}\n{UtilClient.to_jsonstring(content_type)}\n{UtilClient.to_jsonstring(http_request.headers.get('date'))}\n{canonicalized_headers}{canonicalized_resource}"
        signature = Encoder.base_64encode_to_string(Signer.hmac_sha256sign(string_to_sign, access_key_secret))
        http_request.headers['Authorization'] = f'FC {access_key_id}:{signature}'
        return http_request.headers

    async def sign_request_async(
        self,
        request: gateway_fc_models.HttpRequest,
        credential: CredentialClient,
    ) -> Dict[str, Any]:
        http_request = gateway_fc_models.HttpRequest(
            method=request.method,
            path=request.path,
            headers=request.headers,
            body=request.body,
            req_body_type=request.req_body_type
        )
        http_request.headers['date'] = UtilClient.get_date_utcstring()
        http_request.headers['accept'] = 'application/json'
        http_request.headers['content-type'] = 'application/json'
        if not UtilClient.is_unset(request.body):
            if UtilClient.equal_string(request.req_body_type, 'json'):
                http_request.headers['content-type'] = 'application/json'
            elif UtilClient.equal_string(request.req_body_type, 'form'):
                http_request.headers['content-type'] = 'application/x-www-form-urlencoded'
            elif UtilClient.equal_string(request.req_body_type, 'binary'):
                http_request.headers['content-type'] = 'application/octet-stream'
        access_key_id = await credential.get_access_key_id_async()
        access_key_secret = await credential.get_access_key_secret_async()
        security_token = await credential.get_security_token_async()
        if not UtilClient.empty(security_token):
            http_request.headers['x-fc-security-token'] = security_token
        resource = request.path
        content_md_5 = http_request.headers.get('content-md5')
        if UtilClient.is_unset(content_md_5):
            content_md_5 = ''
        content_type = http_request.headers.get('content-type')
        if UtilClient.is_unset(content_type):
            content_type = ''
        string_to_sign = ''
        canonicalized_resource = await self.build_canonicalized_resource_async(resource)
        canonicalized_headers = await self.build_canonicalized_headers_async(http_request.headers)
        string_to_sign = f"{request.method}\n{UtilClient.to_jsonstring(content_md_5)}\n{UtilClient.to_jsonstring(content_type)}\n{UtilClient.to_jsonstring(http_request.headers.get('date'))}\n{canonicalized_headers}{canonicalized_resource}"
        signature = Encoder.base_64encode_to_string(Signer.hmac_sha256sign(string_to_sign, access_key_secret))
        http_request.headers['Authorization'] = f'FC {access_key_id}:{signature}'
        return http_request.headers

    def build_canonicalized_resource(
        self,
        pathname: str,
    ) -> str:
        paths = StringClient.split(pathname, f'?', 2)
        canonicalized_resource = paths[0]
        resources = {}
        if UtilClient.equal_number(ArrayClient.size(paths), 2):
            resources = StringClient.split(paths[1], '&', None)
        sorted_params = ArrayClient.asc_sort(resources)
        if UtilClient.equal_number(ArrayClient.size(sorted_params), 0):
            return f'{canonicalized_resource}\n'
        sub_resources = ArrayClient.join(sorted_params, '\n')
        return f'{canonicalized_resource}\n{sub_resources}'

    async def build_canonicalized_resource_async(
        self,
        pathname: str,
    ) -> str:
        paths = StringClient.split(pathname, f'?', 2)
        canonicalized_resource = paths[0]
        resources = {}
        if UtilClient.equal_number(ArrayClient.size(paths), 2):
            resources = StringClient.split(paths[1], '&', None)
        sorted_params = ArrayClient.asc_sort(resources)
        if UtilClient.equal_number(ArrayClient.size(sorted_params), 0):
            return f'{canonicalized_resource}\n'
        sub_resources = ArrayClient.join(sorted_params, '\n')
        return f'{canonicalized_resource}\n{sub_resources}'

    def build_canonicalized_headers(
        self,
        headers: Dict[str, Any],
    ) -> str:
        canonicalized_headers = ''
        keys = MapClient.key_set(headers)
        sorted_headers = ArrayClient.asc_sort(keys)
        for header in sorted_headers:
            if StringClient.contains(StringClient.to_lower(header), 'x-fc-'):
                canonicalized_headers = f'{canonicalized_headers}{StringClient.to_lower(header)}:{UtilClient.to_jsonstring(headers.get(header))}\n'
        return canonicalized_headers

    async def build_canonicalized_headers_async(
        self,
        headers: Dict[str, Any],
    ) -> str:
        canonicalized_headers = ''
        keys = MapClient.key_set(headers)
        sorted_headers = ArrayClient.asc_sort(keys)
        for header in sorted_headers:
            if StringClient.contains(StringClient.to_lower(header), 'x-fc-'):
                canonicalized_headers = f'{canonicalized_headers}{StringClient.to_lower(header)}:{UtilClient.to_jsonstring(headers.get(header))}\n'
        return canonicalized_headers
