def get_verified_connection()

in aws_advanced_python_wrapper/stale_dns_plugin.py [0:0]


    def get_verified_connection(self, is_initial_connection: bool, host_list_provider_service: HostListProviderService, host_info: HostInfo,
                                props: Properties, connect_func: Callable) -> Connection:
        """
        Ensure the connection created is not a stale writer connection that
        :param is_initial_connection:
        :param host_list_provider_service:
        :param host_info:
        :param props:
        :param connect_func:
        :return:
        """
        if not self._rds_helper.is_writer_cluster_dns(host_info.host):
            return connect_func()

        conn: Connection = connect_func()

        cluster_inet_address: Optional[str] = None
        try:
            cluster_inet_address = socket.gethostbyname(host_info.host)
        except socket.gaierror:
            pass

        host_inet_address: Optional[str] = cluster_inet_address

        logger.debug("StaleDnsHelper.ClusterEndpointDns", host_info.host, host_inet_address)

        if cluster_inet_address is None:
            return conn

        if self._plugin_service.get_host_role(conn) == HostRole.READER:
            # This if-statement is only reached if the connection url is a writer cluster endpoint.
            # If the new connection resolves to a reader instance, this means the topology is outdated.
            # Force refresh to update the topology.
            self._plugin_service.force_refresh_host_list(conn)
        else:
            self._plugin_service.refresh_host_list(conn)

        logger.debug("LogUtils.Topology", LogUtils.log_topology(self._plugin_service.all_hosts))

        if self._writer_host_info is None:
            writer_candidate: Optional[HostInfo] = self._get_writer()
            if writer_candidate is not None and self._rds_helper.is_rds_cluster_dns(writer_candidate.host):
                return conn

            self._writer_host_info = writer_candidate

        logger.debug("StaleDnsHelper.WriterHostSpec", self._writer_host_info)

        if self._writer_host_info is None:
            return conn

        if self._writer_host_address is None:
            try:
                self._writer_host_address = socket.gethostbyname(self._writer_host_info.host)
            except socket.gaierror:
                pass

        logger.debug("StaleDnsHelper.WriterInetAddress", self._writer_host_address)

        if self._writer_host_address is None:
            return conn

        if self._writer_host_address != cluster_inet_address:
            logger.debug("StaleDnsHelper.StaleDnsDetected", self._writer_host_info)

            allowed_hosts = self._plugin_service.hosts
            allowed_hostnames = [host.host for host in allowed_hosts]
            if self._writer_host_info.host not in allowed_hostnames:
                raise AwsWrapperError(
                    Messages.get_formatted(
                        "StaleDnsHelper.CurrentWriterNotAllowed",
                        "<null>" if self._writer_host_info is None else self._writer_host_info.host,
                        LogUtils.log_topology(allowed_hosts)))

            writer_conn: Connection = self._plugin_service.connect(self._writer_host_info, props)
            if is_initial_connection:
                host_list_provider_service.initial_connection_host_info = self._writer_host_info

            if conn is not None:
                try:
                    conn.close()
                except Exception:
                    pass
                return writer_conn

        return conn