in aws_advanced_python_wrapper/aurora_initial_connection_strategy_plugin.py [0:0]
def _get_verified_writer_connection(self, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Connection | None:
retry_delay_ms: int = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props) * 1000000)
writer_candidate_conn: Optional[Connection]
writer_candidate: Optional[HostInfo]
while perf_counter_ns() < end_time_nano:
writer_candidate_conn = None
writer_candidate = None
try:
writer_candidate = self._get_writer()
if writer_candidate is None or self._rds_utils.is_rds_cluster_dns(writer_candidate.host):
# Writer is not found. Topology is outdated.
writer_candidate_conn = connect_func()
self._plugin_service.force_refresh_host_list(writer_candidate_conn)
writer_candidate = self._plugin_service.identify_connection(writer_candidate_conn)
if writer_candidate is not None and writer_candidate.role != HostRole.WRITER:
self._close_connection(writer_candidate_conn)
self._delay(retry_delay_ms)
continue
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = writer_candidate
return writer_candidate_conn
writer_candidate_conn = self._plugin_service.connect(writer_candidate, props)
if self._plugin_service.get_host_role(writer_candidate_conn) != HostRole.WRITER:
self._plugin_service.force_refresh_host_list(writer_candidate_conn)
self._close_connection(writer_candidate_conn)
self._delay(retry_delay_ms)
continue
# Writer connection is valid and verified.
if is_initial_connection and self._host_list_provider_service is not None:
self._host_list_provider_service.initial_connection_host_info = writer_candidate
return writer_candidate_conn
except Exception as e:
self._close_connection(writer_candidate_conn)
raise e
return None