in aws_advanced_python_wrapper/writer_failover_handler.py [0:0]
def refresh_topology_and_connect_to_new_writer(self, initial_writer_host: HostInfo) -> bool:
"""
Re-fetch topology and wait for a new writer.
:param initial_writer_host: the writer host that originally failed.
:return: `True` if a connection to a newly elected writer was successfully established. `False` otherwise.
"""
while not self._timeout_event.is_set():
try:
self._plugin_service.force_refresh_host_list(self._current_reader_connection)
current_topology: Tuple[HostInfo, ...] = self._plugin_service.all_hosts
if len(current_topology) > 0:
if len(current_topology) == 1:
# currently connected reader is in the middle of failover. It is not yet connected to a new writer and works
# as a standalone host. The handler must wait until the reader connects to the entire cluster to fetch the
# cluster topology
logger.debug("WriterFailoverHandler.StandaloneHost",
"None" if self._current_reader_host is None else self._current_reader_host.url)
else:
self._current_topology = current_topology
writer_candidate: Optional[HostInfo] = self.get_writer(self._current_topology)
if not self.is_same(writer_candidate, initial_writer_host):
# new writer available
logger.debug("LogUtils.Topology", LogUtils.log_topology(self._current_topology, "[TaskB]"))
if self.connect_to_writer(writer_candidate):
return True
except Exception as ex:
logger.debug("WriterFailoverHandler.TaskBEncounteredException", ex)
return False
sleep(self._read_topology_interval_sec)
return False