in aws_advanced_python_wrapper/federated_plugin.py [0:0]
def _connect(self, host_info: HostInfo, props: Properties, connect_func: Callable) -> Connection:
SamlUtils.check_idp_credentials_with_fallback(props)
host = IamAuthUtils.get_iam_host(props, host_info)
port = IamAuthUtils.get_port(props, host_info, self._plugin_service.database_dialect.default_port)
region = self._region_utils.get_region(props, WrapperProperties.IAM_REGION.name, host, self._session)
if not region:
error_message = "RdsUtils.UnsupportedHostname"
logger.debug(error_message, host)
raise AwsWrapperError(Messages.get_formatted(error_message, host))
user = WrapperProperties.DB_USER.get(props)
cache_key: str = IamAuthUtils.get_cache_key(
user,
host,
port,
region
)
token_info = FederatedAuthPlugin._token_cache.get(cache_key)
if token_info is not None and not token_info.is_expired():
logger.debug("FederatedAuthPlugin.UseCachedToken", token_info.token)
self._plugin_service.driver_dialect.set_password(props, token_info.token)
else:
self._update_authentication_token(host_info, props, user, region, cache_key)
WrapperProperties.USER.set(props, WrapperProperties.DB_USER.get(props))
try:
return connect_func()
except Exception:
self._update_authentication_token(host_info, props, user, region, cache_key)
try:
return connect_func()
except Exception as e:
error_message = "FederatedAuthPlugin.UnhandledException"
logger.debug(error_message, e)
raise AwsWrapperError(Messages.get_formatted(error_message, e)) from e