alibabacloud-gateway-dingtalk/python/alibabacloud_gateway_dingtalk/client.py (135 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. from Tea.exceptions import TeaException from Tea.core import TeaCore from typing import Any from alibabacloud_gateway_spi.client import Client as SPIClient from alibabacloud_gateway_spi import models as spi_models from alibabacloud_tea_util.client import Client as UtilClient class Client(SPIClient): def __init__(self): super().__init__() def modify_configuration( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: pass async def modify_configuration_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: pass def modify_request( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request config = context.configuration request.headers = TeaCore.merge({ 'host': config.endpoint, 'user-agent': request.user_agent, 'accept': 'application/json' }, request.headers) if not UtilClient.is_unset(request.body): json_obj = UtilClient.to_jsonstring(request.body) request.stream = json_obj request.headers['content-type'] = 'application/json; charset=utf-8' async def modify_request_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request config = context.configuration request.headers = TeaCore.merge({ 'host': config.endpoint, 'user-agent': request.user_agent, 'accept': 'application/json' }, request.headers) if not UtilClient.is_unset(request.body): json_obj = UtilClient.to_jsonstring(request.body) request.stream = json_obj request.headers['content-type'] = 'application/json; charset=utf-8' def modify_response( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request response = context.response if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code): _res = UtilClient.read_as_json(response.body) err = UtilClient.assert_as_map(_res) err['statusCode'] = response.status_code raise TeaException({ 'code': f"{self.default_any(err.get('Code'), err.get('code'))}", 'message': f"code: {response.status_code}, {self.default_any(err.get('Message'), err.get('message'))} request id: {self.default_any(err.get('RequestId'), err.get('requestid'))}", 'data': err, 'description': f"{self.default_any(err.get('Description'), err.get('description'))}", 'accessDeniedDetail': self.default_any(err.get('AccessDeniedDetail'), err.get('accessdenieddetail')) }) if UtilClient.equal_number(response.status_code, 204): UtilClient.read_as_string(response.body) elif UtilClient.equal_string(request.body_type, 'binary'): response.deserialized_body = response.body elif UtilClient.equal_string(request.body_type, 'byte'): byt = UtilClient.read_as_bytes(response.body) response.deserialized_body = byt elif UtilClient.equal_string(request.body_type, 'string'): str = UtilClient.read_as_string(response.body) response.deserialized_body = str elif UtilClient.equal_string(request.body_type, 'json'): obj = UtilClient.read_as_json(response.body) res = UtilClient.assert_as_map(obj) response.deserialized_body = res elif UtilClient.equal_string(request.body_type, 'array'): arr = UtilClient.read_as_json(response.body) response.deserialized_body = arr else: response.deserialized_body = UtilClient.read_as_string(response.body) async def modify_response_async( self, context: spi_models.InterceptorContext, attribute_map: spi_models.AttributeMap, ) -> None: request = context.request response = context.response if UtilClient.is_4xx(response.status_code) or UtilClient.is_5xx(response.status_code): _res = await UtilClient.read_as_json_async(response.body) err = UtilClient.assert_as_map(_res) err['statusCode'] = response.status_code raise TeaException({ 'code': f"{self.default_any(err.get('Code'), err.get('code'))}", 'message': f"code: {response.status_code}, {self.default_any(err.get('Message'), err.get('message'))} request id: {self.default_any(err.get('RequestId'), err.get('requestid'))}", 'data': err, 'description': f"{self.default_any(err.get('Description'), err.get('description'))}", 'accessDeniedDetail': self.default_any(err.get('AccessDeniedDetail'), err.get('accessdenieddetail')) }) if UtilClient.equal_number(response.status_code, 204): await UtilClient.read_as_string_async(response.body) elif UtilClient.equal_string(request.body_type, 'binary'): response.deserialized_body = response.body elif UtilClient.equal_string(request.body_type, 'byte'): byt = await UtilClient.read_as_bytes_async(response.body) response.deserialized_body = byt elif UtilClient.equal_string(request.body_type, 'string'): str = await UtilClient.read_as_string_async(response.body) response.deserialized_body = str elif UtilClient.equal_string(request.body_type, 'json'): obj = await UtilClient.read_as_json_async(response.body) res = UtilClient.assert_as_map(obj) response.deserialized_body = res elif UtilClient.equal_string(request.body_type, 'array'): arr = await UtilClient.read_as_json_async(response.body) response.deserialized_body = arr else: response.deserialized_body = await UtilClient.read_as_string_async(response.body) def default_any( self, input_value: Any, default_value: Any, ) -> Any: if UtilClient.is_unset(input_value): return default_value return input_value