in src/watchdog/__init__.py [0:0]
def get_aws_security_credentials_from_webidentity(config, role_arn, token_file, region):
try:
with open(token_file, "r") as f:
token = f.read()
except Exception as e:
logging.error("Error reading token file %s: %s", token_file, e)
return None
STS_ENDPOINT_URL = STS_ENDPOINT_URL_FORMAT.format(region)
webidentity_url = (
STS_ENDPOINT_URL
+ "?"
+ urlencode(
{
"Version": "2011-06-15",
"Action": "AssumeRoleWithWebIdentity",
"RoleArn": role_arn,
"RoleSessionName": "efs-mount-helper",
"WebIdentityToken": token,
}
)
)
unsuccessful_resp = (
"Unsuccessful retrieval of AWS security credentials at %s." % STS_ENDPOINT_URL
)
url_error_msg = (
"Unable to reach %s to retrieve AWS security credentials. See %s for more info."
% (STS_ENDPOINT_URL, SECURITY_CREDS_WEBIDENTITY_HELP_URL)
)
resp = url_request_helper(
config,
webidentity_url,
unsuccessful_resp,
url_error_msg,
headers={"Accept": "application/json"},
)
if resp:
creds = (
resp.get("AssumeRoleWithWebIdentityResponse", {})
.get("AssumeRoleWithWebIdentityResult", {})
.get("Credentials", {})
)
if all(k in creds for k in ["AccessKeyId", "SecretAccessKey", "SessionToken"]):
return {
"AccessKeyId": creds["AccessKeyId"],
"SecretAccessKey": creds["SecretAccessKey"],
"Token": creds["SessionToken"],
}
return None