aws_advanced_python_wrapper/reader_failover_handler.py (166 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, List, Tuple
from aws_advanced_python_wrapper.utils.properties import PropertiesUtils
if TYPE_CHECKING:
from aws_advanced_python_wrapper.plugin_service import PluginService
from aws_advanced_python_wrapper.utils.properties import Properties
from aws_advanced_python_wrapper.pep249 import Connection
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from copy import deepcopy
from random import shuffle
from threading import Event
from time import sleep
from typing import Optional
from aws_advanced_python_wrapper.failover_result import ReaderFailoverResult
from aws_advanced_python_wrapper.host_availability import HostAvailability
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
from aws_advanced_python_wrapper.utils.failover_mode import (FailoverMode,
get_failover_mode)
from aws_advanced_python_wrapper.utils.log import Logger
logger = Logger(__name__)
class ReaderFailoverHandler:
"""
Interface for Reader Failover Process handler. This handler implements all necessary logic to try to reconnect to another reader host.
"""
@abstractmethod
def failover(self, current_topology: Tuple[HostInfo, ...], current_host: Optional[HostInfo]) -> ReaderFailoverResult:
"""
Called to start Reader Failover Process. This process tries to connect to any reader.
If no reader is available then driver may also try to connect to a writer host, down hosts, and the current reader host.
:param current_topology: The current cluster topology.
:param current_host: The :py:class:`HostInfo` containing information regarding the current connection.
:return: The results of this process.
"""
pass
@abstractmethod
def get_reader_connection(self, hosts: Tuple[HostInfo, ...]) -> ReaderFailoverResult:
"""
Called to get any available reader connection. If no reader is available then result of process is unsuccessful.
This process will not attempt to connect to the writer host.
:param hosts: The current cluster topology.
:return: The results of the failover process.
"""
pass
class ReaderFailoverHandlerImpl(ReaderFailoverHandler):
failed_reader_failover_result = ReaderFailoverResult(None, False, None, None)
def __init__(
self,
plugin_service: PluginService,
properties: Properties,
max_timeout_sec: float = 60,
timeout_sec: float = 30):
self._plugin_service = plugin_service
self._properties = properties
self._max_failover_timeout_sec = max_timeout_sec
self._timeout_sec = timeout_sec
mode = get_failover_mode(self._properties)
self._strict_reader_failover = True if mode is not None and mode == FailoverMode.STRICT_READER else False
self._timeout_event = Event()
@property
def timeout_sec(self):
return self._timeout_sec
@timeout_sec.setter
def timeout_sec(self, value):
self._timeout_sec = value
def failover(self, current_topology: Tuple[HostInfo, ...], current_host: Optional[HostInfo]) -> ReaderFailoverResult:
if current_topology is None or len(current_topology) == 0:
logger.debug("ReaderFailoverHandler.InvalidTopology", "failover")
return ReaderFailoverHandlerImpl.failed_reader_failover_result
result: ReaderFailoverResult = ReaderFailoverHandlerImpl.failed_reader_failover_result
with ThreadPoolExecutor(thread_name_prefix="ReaderFailoverHandlerExecutor") as executor:
future = executor.submit(self._internal_failover_task, current_topology, current_host)
try:
result = future.result(timeout=self._max_failover_timeout_sec)
if result is None:
result = ReaderFailoverHandlerImpl.failed_reader_failover_result
except TimeoutError:
self._timeout_event.set()
return result
def _internal_failover_task(
self, topology: Tuple[HostInfo, ...], current_host: Optional[HostInfo]) -> ReaderFailoverResult:
try:
while not self._timeout_event.is_set():
result = self._failover_internal(topology, current_host)
if result is not None and result.is_connected:
if not self._strict_reader_failover:
return result # any host is fine
# need to ensure that the new connection is to a reader host
self._plugin_service.force_refresh_host_list(result.connection)
if result.new_host is not None:
topology = self._plugin_service.all_hosts
for host in topology:
# found new connection host in the latest topology
if host.url == result.new_host.url and host.role == HostRole.READER:
return result
# New host is not found in the latest topology. There are few possible reasons for that.
# - Host is not yet presented in the topology due to failover process in progress
# - Host is in the topology but its role isn't a READER (that is not acceptable option due to this.strictReader setting)
# Need to continue this loop and to make another try to connect to a reader.
if result.connection is not None:
result.connection.close()
sleep(1) # Sleep for 1 second
except Exception as err:
return ReaderFailoverResult(None, False, None, err)
return ReaderFailoverHandlerImpl.failed_reader_failover_result
def _failover_internal(self, hosts: Tuple[HostInfo, ...], current_host: Optional[HostInfo]) -> ReaderFailoverResult:
if current_host is not None:
self._plugin_service.set_availability(current_host.all_aliases, HostAvailability.UNAVAILABLE)
hosts_by_priority = ReaderFailoverHandlerImpl.get_hosts_by_priority(hosts, self._strict_reader_failover)
return self._get_connection_from_host_group(hosts_by_priority)
def get_reader_connection(self, hosts: Tuple[HostInfo, ...]) -> ReaderFailoverResult:
if hosts is None or len(hosts) == 0:
logger.debug("ReaderFailoverHandler.InvalidTopology", "get_reader_connection")
return ReaderFailoverHandlerImpl.failed_reader_failover_result
hosts_by_priority = ReaderFailoverHandlerImpl.get_reader_hosts_by_priority(hosts)
return self._get_connection_from_host_group(hosts_by_priority)
def _get_connection_from_host_group(self, hosts: Tuple[HostInfo, ...]) -> ReaderFailoverResult:
for i in range(0, len(hosts), 2):
result = self._get_result_from_next_task_batch(hosts, i)
if result.is_connected or result.exception is not None:
return result
sleep(1) # Sleep for 1 second
return ReaderFailoverHandlerImpl.failed_reader_failover_result
def _get_result_from_next_task_batch(self, hosts: Tuple[HostInfo, ...], i: int) -> ReaderFailoverResult:
with ThreadPoolExecutor(thread_name_prefix="ReaderFailoverHandlerRetrieveResultsExecutor") as executor:
futures = [executor.submit(self.attempt_connection, hosts[i])]
if i + 1 < len(hosts):
futures.append(executor.submit(self.attempt_connection, hosts[i + 1]))
try:
for future in as_completed(futures, timeout=self.timeout_sec):
result = future.result()
if result.is_connected or result.exception is not None:
executor.shutdown(wait=False)
return result
except TimeoutError:
self._timeout_event.set()
finally:
self._timeout_event.set()
return ReaderFailoverHandlerImpl.failed_reader_failover_result
def attempt_connection(self, host: HostInfo) -> ReaderFailoverResult:
props: Properties = deepcopy(self._properties)
logger.debug("ReaderFailoverHandler.AttemptingReaderConnection", host.url, PropertiesUtils.mask_properties(props))
try:
conn: Connection = self._plugin_service.force_connect(host, props, self._timeout_event)
self._plugin_service.set_availability(host.all_aliases, HostAvailability.AVAILABLE)
logger.debug("ReaderFailoverHandler.SuccessfulReaderConnection", host.url)
return ReaderFailoverResult(conn, True, host, None)
except Exception as ex:
logger.debug("ReaderFailoverHandler.FailedReaderConnection", host.url)
self._plugin_service.set_availability(host.all_aliases, HostAvailability.UNAVAILABLE)
if not self._plugin_service.is_network_exception(ex):
return ReaderFailoverResult(None, False, None, ex)
return ReaderFailoverHandlerImpl.failed_reader_failover_result
@classmethod
def get_hosts_by_priority(cls, hosts, readers_only: bool):
active_readers: List[HostInfo] = []
down_hosts: List[HostInfo] = []
writer_host: Optional[HostInfo] = None
for host in hosts:
if host.role == HostRole.WRITER:
writer_host = host
continue
if host.get_raw_availability() == HostAvailability.AVAILABLE:
active_readers.append(host)
else:
down_hosts.append(host)
shuffle(active_readers)
shuffle(down_hosts)
hosts_by_priority = active_readers
num_readers = len(active_readers) + len(down_hosts)
if writer_host is not None and (not readers_only or num_readers == 0):
hosts_by_priority.append(writer_host)
hosts_by_priority += down_hosts
return tuple(hosts_by_priority)
@classmethod
def get_reader_hosts_by_priority(cls, hosts: Tuple[HostInfo, ...]) -> Tuple[HostInfo, ...]:
active_readers: List[HostInfo] = []
down_hosts: List[HostInfo] = []
for host in hosts:
if host.role == HostRole.WRITER:
continue
if host.get_raw_availability() == HostAvailability.AVAILABLE:
active_readers.append(host)
else:
down_hosts.append(host)
shuffle(active_readers)
shuffle(down_hosts)
return tuple(active_readers + down_hosts)