in pathology/shared_libs/flags/secret_flag_utils.py [0:0]
def _read_secrets(secret_name: str) -> Mapping[str, Any]:
"""Returns secret from secret manager.
Args:
secret_name: Name of secret.
Returns:
Secret value.
Raises:
SecretDecodeError: Error retrieving value from secret manager.
"""
if not secret_name:
return {}
match = _PARSE_SECRET_CONFIG.fullmatch(secret_name)
if match is None:
raise SecretDecodeError(
'incorrectly formatted secret; expecting'
f' [projects/.+/secrets/.+/versions/.+; passed {secret_name}.',
secret_name=secret_name,
)
project, secret, *_, version = match.groups()
if not _ENABLE_ENV_SECRET_MANAGER:
return {}
with _cache_lock:
cached_val = _cache.get(secret_name)
if cached_val is not None:
return cached_val
with secretmanager.SecretManagerServiceClient() as client:
parent = client.secret_path(project, secret)
try:
if version is None or not version:
version = _get_secret_version(client, parent)
secret = client.access_secret_version(
request={'name': f'{parent}/versions/{version}'}
)
except google.api_core.exceptions.NotFound as exp:
raise SecretDecodeError(
'Secret not found.', secret_name=secret_name
) from exp
except google.api_core.exceptions.PermissionDenied as exp:
raise SecretDecodeError(
'Permission denied reading secret.', secret_name=secret_name
) from exp
data = secret.payload.data
if data is None or not data:
return {}
if isinstance(data, bytes):
data = data.decode('utf-8')
try:
value = json.loads(data)
except json.JSONDecodeError as exp:
raise SecretDecodeError(
'Could not decode secret value.', secret_name=secret_name, data=data
) from exp
if not isinstance(value, Mapping):
raise SecretDecodeError(
'Secret value does not define a mapping.',
secret_name=secret_name,
data=data,
)
_cache[secret_name] = value
return value