def get_aws_security_credentials_from_pod_identity()

in src/mount_efs/__init__.py [0:0]


def get_aws_security_credentials_from_pod_identity(config, is_fatal=False):
    if (
        AWS_CONTAINER_CREDS_FULL_URI_ENV not in os.environ
        or AWS_CONTAINER_AUTH_TOKEN_FILE_ENV not in os.environ
    ):
        return None, None

    creds_uri = os.environ[AWS_CONTAINER_CREDS_FULL_URI_ENV]
    token_file = os.environ[AWS_CONTAINER_AUTH_TOKEN_FILE_ENV]

    try:
        with open(token_file, "r") as f:
            token = f.read().strip()
            if "\r" in token or "\n" in token:
                if is_fatal:
                    unsuccessful_resp = (
                        "AWS Container Auth Token contains invalid characters"
                    )
                    fatal_error(unsuccessful_resp, unsuccessful_resp)
                return None, None
    except Exception as e:
        if is_fatal:
            unsuccessful_resp = (
                f"Error reading Aws Container Auth Token file {token_file}: {e}"
            )
            fatal_error(unsuccessful_resp, unsuccessful_resp)
        return None, None

    unsuccessful_resp = f"Unsuccessful retrieval of AWS security credentials from Container Credentials URI at {creds_uri}"
    url_error_msg = f"Unable to reach Container Credentials URI at {creds_uri}"

    pod_identity_security_dict = url_request_helper(
        config,
        creds_uri,
        unsuccessful_resp,
        url_error_msg,
        headers={"Authorization": token},
    )

    if pod_identity_security_dict and all(
        k in pod_identity_security_dict for k in CREDENTIALS_KEYS
    ):
        return pod_identity_security_dict, f"podidentity:{creds_uri},{token_file}"

    if is_fatal:
        fatal_error(unsuccessful_resp, unsuccessful_resp)
    return None, None