in aws_advanced_python_wrapper/stale_dns_plugin.py [0:0]
def get_verified_connection(self, is_initial_connection: bool, host_list_provider_service: HostListProviderService, host_info: HostInfo,
props: Properties, connect_func: Callable) -> Connection:
"""
Ensure the connection created is not a stale writer connection that
:param is_initial_connection:
:param host_list_provider_service:
:param host_info:
:param props:
:param connect_func:
:return:
"""
if not self._rds_helper.is_writer_cluster_dns(host_info.host):
return connect_func()
conn: Connection = connect_func()
cluster_inet_address: Optional[str] = None
try:
cluster_inet_address = socket.gethostbyname(host_info.host)
except socket.gaierror:
pass
host_inet_address: Optional[str] = cluster_inet_address
logger.debug("StaleDnsHelper.ClusterEndpointDns", host_info.host, host_inet_address)
if cluster_inet_address is None:
return conn
if self._plugin_service.get_host_role(conn) == HostRole.READER:
# This if-statement is only reached if the connection url is a writer cluster endpoint.
# If the new connection resolves to a reader instance, this means the topology is outdated.
# Force refresh to update the topology.
self._plugin_service.force_refresh_host_list(conn)
else:
self._plugin_service.refresh_host_list(conn)
logger.debug("LogUtils.Topology", LogUtils.log_topology(self._plugin_service.all_hosts))
if self._writer_host_info is None:
writer_candidate: Optional[HostInfo] = self._get_writer()
if writer_candidate is not None and self._rds_helper.is_rds_cluster_dns(writer_candidate.host):
return conn
self._writer_host_info = writer_candidate
logger.debug("StaleDnsHelper.WriterHostSpec", self._writer_host_info)
if self._writer_host_info is None:
return conn
if self._writer_host_address is None:
try:
self._writer_host_address = socket.gethostbyname(self._writer_host_info.host)
except socket.gaierror:
pass
logger.debug("StaleDnsHelper.WriterInetAddress", self._writer_host_address)
if self._writer_host_address is None:
return conn
if self._writer_host_address != cluster_inet_address:
logger.debug("StaleDnsHelper.StaleDnsDetected", self._writer_host_info)
allowed_hosts = self._plugin_service.hosts
allowed_hostnames = [host.host for host in allowed_hosts]
if self._writer_host_info.host not in allowed_hostnames:
raise AwsWrapperError(
Messages.get_formatted(
"StaleDnsHelper.CurrentWriterNotAllowed",
"<null>" if self._writer_host_info is None else self._writer_host_info.host,
LogUtils.log_topology(allowed_hosts)))
writer_conn: Connection = self._plugin_service.connect(self._writer_host_info, props)
if is_initial_connection:
host_list_provider_service.initial_connection_host_info = self._writer_host_info
if conn is not None:
try:
conn.close()
except Exception:
pass
return writer_conn
return conn