def get_aws_security_credentials_from_pod_identity()

in src/watchdog/__init__.py [0:0]


def get_aws_security_credentials_from_pod_identity(config, value):
    dict_keys = ["AccessKeyId", "SecretAccessKey", "Token"]

    try:
        creds_uri, token_file = value.split(",")
    except ValueError:
        logging.info("Invalid Aws Container Auth token URI format")
        return None

    try:
        with open(token_file, "r") as f:
            token = f.read().strip()
            if "\r" in token or "\n" in token:
                logging.error("AWS Container Auth Token contains invalid characters")
                return None
    except Exception as e:
        logging.error("Error reading token file %s: %s", token_file, e)
        return None

    unsuccessful_resp = (
        f"Unsuccessful retrieval of AWS security credentials at {creds_uri}"
    )
    url_error_msg = f"Unable to reach {creds_uri} to retrieve AWS security credentials"

    pod_identity_security_dict = url_request_helper(
        config,
        creds_uri,
        unsuccessful_resp,
        url_error_msg,
        headers={"Authorization": token},
    )

    if pod_identity_security_dict and all(
        k in pod_identity_security_dict for k in dict_keys
    ):
        return pod_identity_security_dict

    return None