in aws_emr_launch/lambda_sources/emr_utilities/run_job_flow/lambda_source.py [0:0]
def get_secret_value(secret_id: str) -> Dict[str, Any]:
try:
secret_response = secretsmanager.get_secret_value(SecretId=secret_id)
except ClientError as e:
if e.response["Error"]["Code"] == "DecryptionFailureException":
raise SecretDecryptionFailureError(f"SecretDecryptionFailure: {secret_id}")
elif e.response["Error"]["Code"] == "ResourceNotFoundException":
raise SecretNotFoundError(f"SecretNotFound: {secret_id}")
else:
raise e
val: Dict[str, Any] = (
json.loads(secret_response.pop("SecretString"))
if "SecretString" in secret_response
else json.loads(base64.b64decode(secret_response.pop("SecretBinary")))
)
logger.info(f"SecretFound: {secret_id}")
return val