gateway-iot-edge/python/alicloud_gateway_iot_edge/client.py (146 lines of code) (raw):
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import time
from Tea.request import TeaRequest
from Tea.exceptions import TeaException, UnretryableException
from Tea.core import TeaCore
from alicloud_gateway_iot_edge import models as gateway_io_tedge_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_url.client import Client as UrlClient
from alibabacloud_darabonba_number.client import Client as NumberClient
from alibabacloud_tea_util.client import Client as UtilClient
class Client:
_app_key: str = None
_app_secret: str = None
_endpoint: str = None
def __init__(
self,
config: gateway_io_tedge_models.Config,
):
self._app_key = config.app_key
self._app_secret = config.app_secret
self._endpoint = config.endpoint
def execute(
self,
request: gateway_io_tedge_models.ApiRequest,
runtime: util_models.RuntimeOptions,
) -> str:
request.validate()
runtime.validate()
_runtime = {
'timeouted': 'retry',
'readTimeout': runtime.read_timeout,
'connectTimeout': runtime.connect_timeout,
'maxIdleConns': runtime.max_idle_conns,
'retry': {
'retryable': runtime.autoretry,
'maxAttempts': runtime.max_attempts
},
'ignoreSSL': runtime.ignore_ssl
}
_last_request = None
_last_exception = None
_now = time.time()
_retry_times = 0
while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
if _retry_times > 0:
_backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
if _backoff_time > 0:
TeaCore.sleep(_backoff_time)
_retry_times = _retry_times + 1
try:
_request = TeaRequest()
_request.method = 'POST'
url = UrlClient.parse_url(self._endpoint)
_request.protocol = url.scheme
_request.port = NumberClient.parse_int(url.host.port)
_request.pathname = f'{UtilClient.default_string(request.pathname, url.path.pathname)}/{request.action}'
_request.headers = TeaCore.merge({
'host': url.host.hostname,
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}, request.headers)
params = TeaCore.merge({
'AppKey': self._app_key,
'Action': request.action
}, request.body)
_request.body = UtilClient.to_jsonstring(params)
_last_request = _request
_response = TeaCore.do_action(_request, _runtime)
if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
res = UtilClient.read_as_json(_response.body)
err = UtilClient.assert_as_map(res)
err['statusCode'] = _response.status_code
raise TeaException({
'code': f"{err.get('Code')}",
'message': f"statusCode: {err.get('statusCode')}, errorMessage: {err.get('ErrorMessage')}, requestId: {err.get('RequestId')}",
'data': err
})
return UtilClient.read_as_string(_response.body)
except Exception as e:
if TeaCore.is_retryable(e):
_last_exception = e
continue
raise e
raise UnretryableException(_last_request, _last_exception)
async def execute_async(
self,
request: gateway_io_tedge_models.ApiRequest,
runtime: util_models.RuntimeOptions,
) -> str:
request.validate()
runtime.validate()
_runtime = {
'timeouted': 'retry',
'readTimeout': runtime.read_timeout,
'connectTimeout': runtime.connect_timeout,
'maxIdleConns': runtime.max_idle_conns,
'retry': {
'retryable': runtime.autoretry,
'maxAttempts': runtime.max_attempts
},
'ignoreSSL': runtime.ignore_ssl
}
_last_request = None
_last_exception = None
_now = time.time()
_retry_times = 0
while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
if _retry_times > 0:
_backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
if _backoff_time > 0:
TeaCore.sleep(_backoff_time)
_retry_times = _retry_times + 1
try:
_request = TeaRequest()
_request.method = 'POST'
url = UrlClient.parse_url(self._endpoint)
_request.protocol = url.scheme
_request.port = NumberClient.parse_int(url.host.port)
_request.pathname = f'{UtilClient.default_string(request.pathname, url.path.pathname)}/{request.action}'
_request.headers = TeaCore.merge({
'host': url.host.hostname,
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}, request.headers)
params = TeaCore.merge({
'AppKey': self._app_key,
'Action': request.action
}, request.body)
_request.body = UtilClient.to_jsonstring(params)
_last_request = _request
_response = await TeaCore.async_do_action(_request, _runtime)
if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
res = await UtilClient.read_as_json_async(_response.body)
err = UtilClient.assert_as_map(res)
err['statusCode'] = _response.status_code
raise TeaException({
'code': f"{err.get('Code')}",
'message': f"statusCode: {err.get('statusCode')}, errorMessage: {err.get('ErrorMessage')}, requestId: {err.get('RequestId')}",
'data': err
})
return await UtilClient.read_as_string_async(_response.body)
except Exception as e:
if TeaCore.is_retryable(e):
_last_exception = e
continue
raise e
raise UnretryableException(_last_request, _last_exception)