in aws_advanced_python_wrapper/aurora_initial_connection_strategy_plugin.py [0:0]
def _get_verified_reader_connection(self, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Optional[Connection]:
retry_delay_ms: int = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props) * 1000000)
reader_candidate_conn: Optional[Connection]
reader_candidate: Optional[HostInfo]
while perf_counter_ns() < end_time_nano:
reader_candidate_conn = None
reader_candidate = None
try:
reader_candidate = self._get_reader(props)
if reader_candidate is None or self._rds_utils.is_rds_cluster_dns(reader_candidate.host):
# READER is not found. Topology is outdated.
reader_candidate_conn = connect_func()
self._plugin_service.force_refresh_host_list(reader_candidate_conn)
reader_candidate = self._plugin_service.identify_connection(reader_candidate_conn)
if reader_candidate is not None and reader_candidate.role != HostRole.READER:
if self._has_no_readers():
# Cluster has no readers. Simulate Aurora reader cluster endpoint logic and return the current writer connection.
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = reader_candidate
return reader_candidate_conn
self._close_connection(reader_candidate_conn)
self._delay(retry_delay_ms)
continue
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = reader_candidate
return reader_candidate_conn
reader_candidate_conn = self._plugin_service.connect(reader_candidate, props)
if self._plugin_service.get_host_role(reader_candidate_conn) != HostRole.READER:
# If the new connection resolves to a writer instance, the topology is outdated.
# Force refresh to update the topology.
self._plugin_service.force_refresh_host_list(reader_candidate_conn)
if self._has_no_readers():
# Cluster has no readers. Simulate Aurora reader cluster endpoint logic and return the current writer connection.
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = reader_candidate
return reader_candidate_conn
self._close_connection(reader_candidate_conn)
self._delay(retry_delay_ms)
continue
# Reader connection is valid and verified.
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = reader_candidate
return reader_candidate_conn
except Exception as e:
self._close_connection(reader_candidate_conn)
if not self._plugin_service.is_login_exception(e) and reader_candidate is not None:
self._plugin_service.set_availability(reader_candidate.as_aliases(), HostAvailability.UNAVAILABLE)
raise e
return None