in src/mount_efs/__init__.py [0:0]
def get_aws_security_credentials_from_pod_identity(config, is_fatal=False):
if (
AWS_CONTAINER_CREDS_FULL_URI_ENV not in os.environ
or AWS_CONTAINER_AUTH_TOKEN_FILE_ENV not in os.environ
):
return None, None
creds_uri = os.environ[AWS_CONTAINER_CREDS_FULL_URI_ENV]
token_file = os.environ[AWS_CONTAINER_AUTH_TOKEN_FILE_ENV]
try:
with open(token_file, "r") as f:
token = f.read().strip()
if "\r" in token or "\n" in token:
if is_fatal:
unsuccessful_resp = (
"AWS Container Auth Token contains invalid characters"
)
fatal_error(unsuccessful_resp, unsuccessful_resp)
return None, None
except Exception as e:
if is_fatal:
unsuccessful_resp = (
f"Error reading Aws Container Auth Token file {token_file}: {e}"
)
fatal_error(unsuccessful_resp, unsuccessful_resp)
return None, None
unsuccessful_resp = f"Unsuccessful retrieval of AWS security credentials from Container Credentials URI at {creds_uri}"
url_error_msg = f"Unable to reach Container Credentials URI at {creds_uri}"
pod_identity_security_dict = url_request_helper(
config,
creds_uri,
unsuccessful_resp,
url_error_msg,
headers={"Authorization": token},
)
if pod_identity_security_dict and all(
k in pod_identity_security_dict for k in CREDENTIALS_KEYS
):
return pod_identity_security_dict, f"podidentity:{creds_uri},{token_file}"
if is_fatal:
fatal_error(unsuccessful_resp, unsuccessful_resp)
return None, None