in src/ab/utils/sae_util.py [0:0]
def send_request(sae_request: SaeRequest):
credentials = AccessKeyCredential(sae_request.get_ak(), sae_request.get_sk())
client = AcsClient(region_id=sae_request.get_region_id(), credential=credentials)
request = CommonRequest()
request.set_accept_format('json')
request.add_header('Content-Type', 'application/json')
request.set_method(sae_request.get_method())
request.set_protocol_type(sae_request.get_protocol_type())
request.set_domain(sae_request.get_domain())
request.set_version(sae_request.get_version())
if sae_request.get_url() is not None:
request.set_uri_pattern(sae_request.get_url())
else:
request.set_action_name(sae_request.get_action())
params = sae_request.get_params()
if params is not None and isinstance(params, dict):
dict_params = dict(params)
for key, value in dict_params.items():
request.add_query_param(key, value)
body = '''{}'''
request.set_content(body.encode('utf-8'))
try:
response = client.do_action_with_exception(request)
result = json.loads(str(response, encoding='utf-8'))
return dict(result)
except ServerException as e:
result = {"Message": "error", "Details": "ServerException:{}".format(e)}
return result
except ClientException as e:
result = {"Message": "error", "Details": "ClientException:{}".format(e)}
return result
except Exception as e:
result = {"Message": "error", "Details": "Exception:{}".format(e)}
return result