def get_aws_security_credentials_from_webidentity()

in src/watchdog/__init__.py [0:0]


def get_aws_security_credentials_from_webidentity(config, role_arn, token_file, region):
    try:
        with open(token_file, "r") as f:
            token = f.read()
    except Exception as e:
        logging.error("Error reading token file %s: %s", token_file, e)
        return None

    STS_ENDPOINT_URL = STS_ENDPOINT_URL_FORMAT.format(region)
    webidentity_url = (
        STS_ENDPOINT_URL
        + "?"
        + urlencode(
            {
                "Version": "2011-06-15",
                "Action": "AssumeRoleWithWebIdentity",
                "RoleArn": role_arn,
                "RoleSessionName": "efs-mount-helper",
                "WebIdentityToken": token,
            }
        )
    )

    unsuccessful_resp = (
        "Unsuccessful retrieval of AWS security credentials at %s." % STS_ENDPOINT_URL
    )
    url_error_msg = (
        "Unable to reach %s to retrieve AWS security credentials. See %s for more info."
        % (STS_ENDPOINT_URL, SECURITY_CREDS_WEBIDENTITY_HELP_URL)
    )
    resp = url_request_helper(
        config,
        webidentity_url,
        unsuccessful_resp,
        url_error_msg,
        headers={"Accept": "application/json"},
    )

    if resp:
        creds = (
            resp.get("AssumeRoleWithWebIdentityResponse", {})
            .get("AssumeRoleWithWebIdentityResult", {})
            .get("Credentials", {})
        )
        if all(k in creds for k in ["AccessKeyId", "SecretAccessKey", "SessionToken"]):
            return {
                "AccessKeyId": creds["AccessKeyId"],
                "SecretAccessKey": creds["SecretAccessKey"],
                "Token": creds["SessionToken"],
            }

    return None