def _update_secret()

in aws_advanced_python_wrapper/aws_secrets_manager_plugin.py [0:0]


    def _update_secret(self, force_refetch: bool = False) -> bool:
        """
        Called to update credentials from the cache, or from the AWS Secrets Manager service.
        :param force_refetch: Allows ignoring cached credentials and force fetches the latest credentials from the service.
        :return: `True`, if credentials were fetched from the service.
        """
        telemetry_factory = self._plugin_service.get_telemetry_factory()
        context = telemetry_factory.open_telemetry_context("fetch credentials", TelemetryTraceLevel.NESTED)
        self._fetch_credentials_counter.inc()

        try:
            fetched: bool = False
            self._secret: Optional[SimpleNamespace] = AwsSecretsManagerPlugin._secrets_cache.get(self._secret_key)
            endpoint = self._secret_key[2]
            if not self._secret or force_refetch:
                try:
                    self._secret = self._fetch_latest_credentials()
                    if self._secret:
                        AwsSecretsManagerPlugin._secrets_cache[self._secret_key] = self._secret
                        fetched = True
                except (ClientError, AttributeError) as e:
                    logger.debug("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)
                    raise AwsWrapperError(
                        Messages.get_formatted("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)) from e
                except JSONDecodeError as e:
                    logger.debug("AwsSecretsManagerPlugin.JsonDecodeError", e)
                    raise AwsWrapperError(
                        Messages.get_formatted("AwsSecretsManagerPlugin.JsonDecodeError", e))
                except EndpointConnectionError:
                    logger.debug("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint)
                    raise AwsWrapperError(
                        Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint))
                except ValueError:
                    logger.debug("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint)
                    raise AwsWrapperError(
                        Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint))

            return fetched
        except Exception as ex:
            context.set_success(False)
            context.set_exception(ex)
            raise ex
        finally:
            context.close_context()