in src/aws_secretsmanager_caching/decorators.py [0:0]
def __call__(self, func):
"""
Return a function with injected keyword arguments from a cached secret.
:type func: object
:param func: function for injecting keyword arguments.
:return The original function with injected keyword arguments
"""
@wraps(func)
def _wrapped_func(*args, **kwargs):
"""
Internal function to execute wrapped function
"""
try:
secret = json.loads(
self.cache.get_secret_string(secret_id=self.secret_id)
)
except json.decoder.JSONDecodeError:
raise RuntimeError("Cached secret is not valid JSON") from None
resolved_kwargs = {}
for orig_kwarg, secret_key in self.kwarg_map.items():
try:
resolved_kwargs[orig_kwarg] = secret[secret_key]
except KeyError:
raise RuntimeError(
f"Cached secret does not contain key {secret_key}"
) from None
return func(*args, **resolved_kwargs, **kwargs)
return _wrapped_func