in api/PclusterApiHandler.py [0:0]
def sigv4_request(method, host, path, params={}, headers={}, body=None):
"Make a signed request to an api-gateway hosting an AWS ParallelCluster API."
endpoint = host.replace("https://", "").replace("http://", "")
_api_id, _service, region, _domain = endpoint.split(".", maxsplit=3)
request_parameters = "&".join([f"{k}={v}" for k, v in (params or {}).items()])
url = f"{host}{path}?{request_parameters}"
if API_USER_ROLE:
sts_credentials = setup_api_credentials(API_USER_ROLE)
session = boto3.session.Session(
aws_access_key_id=sts_credentials["AccessKeyId"],
aws_secret_access_key=sts_credentials["SecretAccessKey"],
aws_session_token=sts_credentials["SessionToken"]
)
else:
session = boto3.session.Session()
body_data = json.dumps(body) if body else None
new_request = botocore.awsrequest.AWSRequest(method=method, url=url, data=body_data)
botocore.auth.SigV4Auth(session.get_credentials(), "execute-api", region).add_auth(new_request)
boto_request = new_request.prepare()
req_call = {
"POST": requests.post,
"GET": requests.get,
"PUT": requests.put,
"PATCH": requests.patch,
"DELETE": requests.delete,
}.get(method)
if body:
boto_request.headers["content-type"] = "application/json"
for k, val in headers.items():
boto_request.headers[k] = val
return req_call(boto_request.url, data=body_data, headers=boto_request.headers, timeout=30)