in src/aws_secretsmanager_caching/decorators.py [0:0]
def __call__(self, func):
"""
Return a function with injected keyword arguments from a cached secret.
:type func: object
:param func: function for injecting keyword arguments.
:return The original function with injected keyword arguments
"""
try:
secret = json.loads(self.cache.get_secret_string(secret_id=self.secret_id))
except json.decoder.JSONDecodeError:
raise RuntimeError('Cached secret is not valid JSON') from None
resolved_kwargs = dict()
for orig_kwarg in self.kwarg_map:
secret_key = self.kwarg_map[orig_kwarg]
try:
resolved_kwargs[orig_kwarg] = secret[secret_key]
except KeyError:
raise RuntimeError('Cached secret does not contain key {0}'.format(secret_key)) from None
def _wrapped_func(*args, **kwargs):
"""
Internal function to execute wrapped function
"""
return func(*args, **resolved_kwargs, **kwargs)
return _wrapped_func