def _get_verified_writer_connection()

in aws_advanced_python_wrapper/aurora_initial_connection_strategy_plugin.py [0:0]


    def _get_verified_writer_connection(self, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Connection | None:
        retry_delay_ms: int = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props)
        end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props) * 1000000)

        writer_candidate_conn: Optional[Connection]
        writer_candidate: Optional[HostInfo]

        while perf_counter_ns() < end_time_nano:
            writer_candidate_conn = None
            writer_candidate = None

            try:
                writer_candidate = self._get_writer()
                if writer_candidate is None or self._rds_utils.is_rds_cluster_dns(writer_candidate.host):
                    # Writer is not found. Topology is outdated.
                    writer_candidate_conn = connect_func()
                    self._plugin_service.force_refresh_host_list(writer_candidate_conn)
                    writer_candidate = self._plugin_service.identify_connection(writer_candidate_conn)

                    if writer_candidate is not None and writer_candidate.role != HostRole.WRITER:
                        self._close_connection(writer_candidate_conn)
                        self._delay(retry_delay_ms)
                        continue

                    if is_initial_connection and self._host_list_provider_service is not None:
                        self._host_list_provider_service.initial_connection_host_info = writer_candidate

                    return writer_candidate_conn

                writer_candidate_conn = self._plugin_service.connect(writer_candidate, props)

                if self._plugin_service.get_host_role(writer_candidate_conn) != HostRole.WRITER:
                    self._plugin_service.force_refresh_host_list(writer_candidate_conn)
                    self._close_connection(writer_candidate_conn)
                    self._delay(retry_delay_ms)
                    continue

                # Writer connection is valid and verified.
                if is_initial_connection and self._host_list_provider_service is not None:
                    self._host_list_provider_service.initial_connection_host_info = writer_candidate
                return writer_candidate_conn
            except Exception as e:
                self._close_connection(writer_candidate_conn)
                raise e

        return None