aws_advanced_python_wrapper/writer_failover_handler.py (231 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Tuple
from aws_advanced_python_wrapper.utils.properties import PropertiesUtils
if TYPE_CHECKING:
from aws_advanced_python_wrapper.plugin_service import PluginService
from aws_advanced_python_wrapper.utils.properties import Properties
from aws_advanced_python_wrapper.pep249 import Connection
from aws_advanced_python_wrapper.reader_failover_handler import ReaderFailoverHandler
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from threading import Event
from time import sleep
from typing import Optional
from aws_advanced_python_wrapper import LogUtils
from aws_advanced_python_wrapper.errors import AwsWrapperError
from aws_advanced_python_wrapper.failover_result import (ReaderFailoverResult,
WriterFailoverResult)
from aws_advanced_python_wrapper.host_availability import HostAvailability
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
from aws_advanced_python_wrapper.utils.log import Logger
from aws_advanced_python_wrapper.utils.messages import Messages
logger = Logger(__name__)
class WriterFailoverHandler:
"""
Interface for Writer Failover Process handler.
This handler implements all necessary logic to try to reconnect to a current writer host or to a newly elected writer.
"""
@abstractmethod
def failover(self, current_topology: Tuple[HostInfo, ...]) -> WriterFailoverResult:
"""
Called to start Writer Failover Process. This process tries to establish a connection to a writer instance after failover has occurred.
:param current_topology: The current cluster topology.
:return: The results of this process.
"""
pass
class WriterFailoverHandlerImpl(WriterFailoverHandler):
"""
An implementation of :py:class`WriterFailoverHandler`.
Writer Failover Process goal is to re-establish connection to a writer.
Connection to a writer may be disrupted either by temporary network issue, or due to writer host unavailability during cluster failover.
This handler tries two approaches in parallel:
1. try to re-connect to the writer host that originally failed.
2. try to update the cluster topology using a reader connection, and use that information to connect to a newly elected writer.
"""
failed_writer_failover_result = WriterFailoverResult(False, False, None, None, None, None)
_current_connection: Optional[Connection] = None
_current_topology: Optional[Tuple[HostInfo, ...]] = None
_current_reader_connection: Optional[Connection] = None
_current_reader_host: Optional[HostInfo] = None
def __init__(
self,
plugin_service: PluginService,
reader_failover_handler: ReaderFailoverHandler,
initial_connection_properties: Properties,
max_timeout_sec: float = 60,
read_topology_interval_sec: float = 5,
reconnect_writer_interval_sec: float = 5):
self._plugin_service = plugin_service
self._reader_failover_handler = reader_failover_handler
self._initial_connection_properties = initial_connection_properties
self._max_failover_timeout_sec = max_timeout_sec
self._read_topology_interval_sec = read_topology_interval_sec
self._reconnect_writer_interval_sec = reconnect_writer_interval_sec
self._timeout_event = Event()
def failover(self, current_topology: Tuple[HostInfo, ...]) -> WriterFailoverResult:
if current_topology is None or len(current_topology) == 0:
logger.debug("WriterFailoverHandler.FailoverCalledWithInvalidTopology")
return WriterFailoverHandlerImpl.failed_writer_failover_result
result: WriterFailoverResult = self.get_result_from_future(current_topology)
if result.is_connected or result.exception is not None:
return result
logger.debug("WriterFailoverHandler.FailedToConnectToWriterInstance")
return WriterFailoverHandlerImpl.failed_writer_failover_result
def get_writer(self, topology: Tuple[HostInfo, ...]) -> Optional[HostInfo]:
if topology is None or len(topology) == 0:
return None
for host in topology:
if host.role == HostRole.WRITER:
return host
return None
def get_result_from_future(self, current_topology: Tuple[HostInfo, ...]) -> WriterFailoverResult:
writer_host: Optional[HostInfo] = self.get_writer(current_topology)
if writer_host is not None:
self._plugin_service.set_availability(writer_host.as_aliases(), HostAvailability.UNAVAILABLE)
with ThreadPoolExecutor(thread_name_prefix="WriterFailoverHandlerExecutor") as executor:
try:
futures = [executor.submit(self.reconnect_to_writer, writer_host),
executor.submit(self.wait_for_new_writer, current_topology, writer_host)]
for future in as_completed(futures, timeout=self._max_failover_timeout_sec):
result = future.result()
if result.is_connected:
executor.shutdown(wait=False)
self.log_task_success(result)
return result
if result.exception is not None:
executor.shutdown(wait=False)
return result
except TimeoutError:
self._timeout_event.set()
finally:
self._timeout_event.set()
return WriterFailoverHandlerImpl.failed_writer_failover_result
def log_task_success(self, result: WriterFailoverResult) -> None:
topology: Optional[Tuple[HostInfo, ...]] = result.topology
if topology is None or len(topology) == 0:
task_name: Optional[str] = result.task_name if not None else "None"
logger.error("WriterFailoverHandler.SuccessfulConnectionInvalidTopology", task_name)
return
writer_host: Optional[HostInfo] = self.get_writer(topology)
new_writer_host: Optional[str] = "None" if writer_host is None else writer_host.url
if result.is_new_host:
logger.debug("WriterFailoverHandler.SuccessfullyConnectedToNewWriterInstance", new_writer_host)
else:
logger.debug("WriterFailoverHandler.SuccessfullyReconnectedToWriterInstance", new_writer_host)
def reconnect_to_writer(self, initial_writer_host: HostInfo):
"""
Task A: Attempt to reconnect to the writer that originally failed.
:param initial_writer_host: the writer host that originally failed.
:return: the :py:class:`WriterFailoverResult` of the failover process.
"""
logger.debug("WriterFailoverHandler.TaskAAttemptReconnectToWriterInstance", initial_writer_host.url,
PropertiesUtils.mask_properties(self._initial_connection_properties))
conn: Optional[Connection] = None
latest_topology: Optional[Tuple[HostInfo, ...]] = None
success: bool = False
try:
while not self._timeout_event.is_set() and (latest_topology is None or len(latest_topology) == 0):
try:
if conn is not None:
conn.close()
conn = self._plugin_service.force_connect(initial_writer_host, self._initial_connection_properties, self._timeout_event)
self._plugin_service.force_refresh_host_list(conn)
latest_topology = self._plugin_service.all_hosts
except Exception as ex:
if not self._plugin_service.is_network_exception(ex):
logger.debug("WriterFailoverHandler.TaskAEncounteredException", ex)
return WriterFailoverResult(False, False, None, None, "TaskA", ex)
if latest_topology is None or len(latest_topology) == 0:
sleep(self._reconnect_writer_interval_sec)
else:
success = self.is_current_host_writer(latest_topology, initial_writer_host)
self._plugin_service.set_availability(initial_writer_host.as_aliases(), HostAvailability.AVAILABLE)
return WriterFailoverResult(success, False, latest_topology, conn if success else None, "TaskA", None)
except Exception as ex:
logger.error("WriterFailoverHandler.TaskAEncounteredException", ex)
return WriterFailoverResult(False, False, None, None, "TaskA", None)
finally:
try:
if conn is not None and not success:
conn.close()
except Exception:
pass
logger.debug("WriterFailoverHandler.TaskAFinished")
def is_current_host_writer(self, latest_topology: Tuple[HostInfo, ...], initial_writer_host: HostInfo) -> bool:
latest_writer: Optional[HostInfo] = self.get_writer(latest_topology)
if latest_writer is None:
return False
latest_writer_all_aliases: frozenset[str] = latest_writer.all_aliases
current_aliases: frozenset[str] = initial_writer_host.all_aliases
return current_aliases is not None and len(current_aliases) > 0 and bool(current_aliases.intersection(latest_writer_all_aliases))
def wait_for_new_writer(self, current_topology: Tuple[HostInfo, ...], current_host: HostInfo) -> WriterFailoverResult:
"""
Task B: Attempt to connect to a newly elected writer.
:param current_topology: the latest topology.
:param current_host: the current host.
:return: the :py:class:`WriterFailoverResult` of the process.
"""
logger.debug("WriterFailoverHandler.TaskBAttemptConnectionToNewWriterInstance",
PropertiesUtils.mask_properties(self._initial_connection_properties))
self._current_topology = current_topology
try:
success: bool = False
while not self._timeout_event.is_set() and not success:
self.connect_to_reader()
success = self.refresh_topology_and_connect_to_new_writer(current_host)
if not success:
self.close_reader_connection()
return WriterFailoverResult(True, True, self._current_topology, self._current_connection, "TaskB", None)
except InterruptedError:
return WriterFailoverResult(False, False, None, None, "TaskB", None)
except Exception as ex:
logger.debug("WriterFailoverHandler.TaskBEncounteredException", ex)
raise ex
finally:
self.cleanup()
logger.debug("WriterFailoverHandler.TaskBFinished")
def connect_to_reader(self) -> None:
while not self._timeout_event.is_set():
if self._current_topology is None:
raise AwsWrapperError(Messages.get("WriterFailoverHandler.CurrentTopologyNone"))
try:
conn_result: ReaderFailoverResult = self._reader_failover_handler.get_reader_connection(self._current_topology)
# check if valid reader connection
if conn_result.is_connected and conn_result.connection is not None and conn_result.new_host is not None:
self._current_reader_connection = conn_result.connection
self._current_reader_host = conn_result.new_host
logger.debug("WriterFailoverHandler.TaskBConnectedToReader", self._current_reader_host.url)
break
except Exception:
pass
logger.debug("WriterFailoverHandler.TaskBFailedToConnectToAnyReader")
sleep(1)
def refresh_topology_and_connect_to_new_writer(self, initial_writer_host: HostInfo) -> bool:
"""
Re-fetch topology and wait for a new writer.
:param initial_writer_host: the writer host that originally failed.
:return: `True` if a connection to a newly elected writer was successfully established. `False` otherwise.
"""
while not self._timeout_event.is_set():
try:
self._plugin_service.force_refresh_host_list(self._current_reader_connection)
current_topology: Tuple[HostInfo, ...] = self._plugin_service.all_hosts
if len(current_topology) > 0:
if len(current_topology) == 1:
# currently connected reader is in the middle of failover. It is not yet connected to a new writer and works
# as a standalone host. The handler must wait until the reader connects to the entire cluster to fetch the
# cluster topology
logger.debug("WriterFailoverHandler.StandaloneHost",
"None" if self._current_reader_host is None else self._current_reader_host.url)
else:
self._current_topology = current_topology
writer_candidate: Optional[HostInfo] = self.get_writer(self._current_topology)
if not self.is_same(writer_candidate, initial_writer_host):
# new writer available
logger.debug("LogUtils.Topology", LogUtils.log_topology(self._current_topology, "[TaskB]"))
if self.connect_to_writer(writer_candidate):
return True
except Exception as ex:
logger.debug("WriterFailoverHandler.TaskBEncounteredException", ex)
return False
sleep(self._read_topology_interval_sec)
return False
def is_same(self, host_info: Optional[HostInfo], current_host_info: Optional[HostInfo]) -> bool:
if host_info is None or current_host_info is None:
return False
return host_info.url == current_host_info.url
def connect_to_writer(self, writer_candidate: Optional[HostInfo]) -> bool:
if self.is_same(writer_candidate, self._current_reader_host):
logger.debug("WriterFailoverHandler.AlreadyWriter")
self._current_connection = self._current_reader_connection
return True
else:
logger.debug("WriterFailoverHandler.TaskBAttemptConnectionToNewWriter", "None" if writer_candidate is None else writer_candidate.url)
try:
# connect to new writer
if writer_candidate is not None:
self._current_connection = self._plugin_service.force_connect(writer_candidate,
self._initial_connection_properties,
self._timeout_event)
self._plugin_service.set_availability(writer_candidate.as_aliases(), HostAvailability.AVAILABLE)
return True
except Exception:
if writer_candidate is not None:
self._plugin_service.set_availability(writer_candidate.as_aliases(), HostAvailability.UNAVAILABLE)
return False
def close_reader_connection(self) -> None:
"""
Close the reader connection if not done so already, and set the relevant fields to None.
"""
try:
if self._current_reader_connection is not None:
self._current_reader_connection.close()
except Exception:
pass
finally:
self._current_reader_connection = None
self._current_reader_host = None
def cleanup(self) -> None:
"""
Close the reader connection if it's not needed.
"""
if self._current_reader_connection is not None and self._current_connection is not self._current_reader_connection:
try:
self._current_reader_connection.close()
except Exception:
pass