in aws_advanced_python_wrapper/host_list_provider.py [0:0]
def identify_connection(self, connection: Optional[Connection]) -> Optional[HostInfo]:
"""
Identify which host the given connection points to.
:param connection: an opened connection.
:return: a :py:class:`HostInfo` object containing host information for the given connection.
"""
if connection is None:
raise AwsWrapperError(Messages.get("RdsHostListProvider.ErrorIdentifyConnection"))
driver_dialect = self._host_list_provider_service.driver_dialect
try:
cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout(
RdsHostListProvider._executor, self._max_timeout, driver_dialect, connection)(self._identify_connection)
result = cursor_execute_func_with_timeout(connection)
if result:
host_id = result[0]
hosts = self.refresh(connection)
is_force_refresh = False
if not hosts:
hosts = self.force_refresh(connection)
is_force_refresh = True
if not hosts:
return None
found_host: Optional[HostInfo] = next((host_info for host_info in hosts if host_info.host_id == host_id), None)
if not found_host and not is_force_refresh:
hosts = self.force_refresh(connection)
if not hosts:
return None
found_host = next(
(host_info for host_info in hosts if host_info.host_id == host_id),
None)
return found_host
except TimeoutError as e:
raise QueryTimeoutError(Messages.get("RdsHostListProvider.IdentifyConnectionTimeout")) from e
raise AwsWrapperError(Messages.get("RdsHostListProvider.ErrorIdentifyConnection"))