in share/secretsmanager.py [0:0]
def aws_sm_expander(config_yaml: str) -> str:
"""
Secrets Manager expander for config file
It scans the file for the secrets manager arn pattern, checks for correct configuration,
retrieves the values from the secret manager and replaces them in the config file.
Exceptions will be raised for the following scenarios:
- Not respecting the arn pattern
- Input is for both plain text and json keys for the same secret manager name
- The fetched value is empty
"""
config_secret_entry_values: dict[str, str] = {}
secret_arn_by_secret_name: dict[str, str] = {}
secret_key_values_cache: dict[str, dict[str, Any]] = {}
secret_consistency_len_check: dict[str, int] = {}
re_pattern = r"arn:aws:secretsmanager:(?:[^:]+)?:(?:[^:]+)?:secret:(?:[^\"']+)?"
found_secrets_entries = re.findall(re_pattern, config_yaml)
for secret_arn in found_secrets_entries:
splitted_secret_arn = secret_arn.split(":")
if len(splitted_secret_arn) != 7 and len(splitted_secret_arn) != 8:
raise SyntaxError("Invalid arn format: {}".format(secret_arn))
if secret_arn not in config_secret_entry_values:
config_secret_entry_values[secret_arn] = ""
region = splitted_secret_arn[3]
secrets_manager_name = splitted_secret_arn[6]
if region == "":
raise ValueError("Must be provided region in arn: {}".format(secret_arn))
if secrets_manager_name == "":
raise ValueError("Must be provided secrets manager name in arn: {}".format(secret_arn))
if secrets_manager_name not in secret_consistency_len_check:
secret_consistency_len_check[secrets_manager_name] = len(splitted_secret_arn)
else:
if secret_consistency_len_check[secrets_manager_name] != len(splitted_secret_arn):
raise ValueError(
"You cannot have both plain text and json key for the same secret: {}".format(secret_arn)
)
if region not in secret_key_values_cache:
secret_key_values_cache[region] = {}
if secrets_manager_name not in secret_key_values_cache[region]:
secret_key_values_cache[region][secrets_manager_name] = {}
secret_arn_by_secret_name[secrets_manager_name] = ":".join(splitted_secret_arn[0:7])
for region in secret_key_values_cache:
for secrets_manager_name in secret_key_values_cache[region]:
secret_arn = secret_arn_by_secret_name[secrets_manager_name]
str_secrets = get_secret_values(secret_arn, region)
parsed_secrets = parse_secrets_str(str_secrets, secret_arn)
secret_key_values_cache[region][secrets_manager_name] = parsed_secrets
for config_secret_entry in config_secret_entry_values:
splitted_secret_arn = config_secret_entry.split(":")
region = splitted_secret_arn[3]
secrets_manager_name = splitted_secret_arn[6]
if len(splitted_secret_arn) == 8:
wanted_key = splitted_secret_arn[-1]
if wanted_key == "":
raise ValueError(f"Error for secret {config_secret_entry}: key must not be empty")
if not isinstance(secret_key_values_cache[region][secrets_manager_name], dict):
raise ValueError(f"Error for secret {config_secret_entry}: expected to be keys/values pair")
if wanted_key in secret_key_values_cache[region][secrets_manager_name]:
fetched_secret_entry_value = secret_key_values_cache[region][secrets_manager_name][wanted_key]
if fetched_secret_entry_value == "":
raise ValueError(f"Error for secret {config_secret_entry}: must not be empty")
config_secret_entry_values[config_secret_entry] = fetched_secret_entry_value
else:
raise KeyError(f"Error for secret {config_secret_entry}: key not found")
else:
if secret_key_values_cache[region][secrets_manager_name] == "":
raise ValueError(f"Error for secret {config_secret_entry}: must not be empty")
elif not isinstance(secret_key_values_cache[region][secrets_manager_name], str):
raise ValueError(f"Error for secret {config_secret_entry}: expected to be a string")
config_secret_entry_values[config_secret_entry] = secret_key_values_cache[region][secrets_manager_name]
config_yaml = config_yaml.replace(config_secret_entry, config_secret_entry_values[config_secret_entry])
return config_yaml