aws_advanced_python_wrapper/connection_provider.py (108 lines of code) (raw):

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations from typing import TYPE_CHECKING, Callable, Dict, Optional, Protocol, Tuple if TYPE_CHECKING: from aws_advanced_python_wrapper.database_dialect import DatabaseDialect from aws_advanced_python_wrapper.driver_dialect import DriverDialect from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole from aws_advanced_python_wrapper.pep249 import Connection from threading import Lock from aws_advanced_python_wrapper.errors import AwsWrapperError from aws_advanced_python_wrapper.host_selector import (HostSelector, RandomHostSelector, RoundRobinHostSelector) from aws_advanced_python_wrapper.plugin import CanReleaseResources from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, PropertiesUtils) logger = Logger(__name__) class ConnectionProvider(Protocol): def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool: """ Indicates whether this ConnectionProvider can provide connections for the given host and properties. Some :py:class:`ConnectionProvider` implementations may not be able to handle certain connection properties. :param host_info: the :py:class:`HostInfo` containing the host-port information for the host to connect to. :param props: the connection properties. :return: `True` if this :py:class:`ConnectionProvider` can provide connections for the given host. `False` otherwise. """ ... def accepts_strategy(self, role: HostRole, strategy: str) -> bool: """ Indicates whether the given selection strategy is supported by the connection provider. :param role: determines if the connection provider should return a reader host or a writer host. :param strategy: the host selection strategy to use. :return: whether the host selection strategy is supported. """ ... def get_host_info_by_strategy( self, hosts: Tuple[HostInfo, ...], role: HostRole, strategy: str, props: Optional[Properties]) -> HostInfo: """ Return a reader or a writer host using the specified strategy. This method should raise an :py:class:`AwsWrapperError` if the specified strategy is unsupported. :param hosts: the list of hosts to select from. :param role: determines if the connection provider should return a reader or a writer host. :param strategy: the host selection strategy to use. :param props: the connection properties. :return: the host selected using the specified strategy. """ ... def connect( self, target_func: Callable, driver_dialect: DriverDialect, database_dialect: DatabaseDialect, host_info: HostInfo, props: Properties) -> Connection: """ Called once per connection that needs to be created. :param target_func: the `Connect` method used by target driver dialect. :param driver_dialect: a dialect that handles target driver specific implementation. :param database_dialect: a dialect that handles database engine specific implementation. :param host_info: the host details for the desired connection. :param props: the connection properties. :return: the established connection resulting from the given connection information. """ ... class DriverConnectionProvider(ConnectionProvider): _accepted_strategies: Dict[str, HostSelector] = \ {"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()} def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool: return True def accepts_strategy(self, role: HostRole, strategy: str) -> bool: return strategy in self._accepted_strategies def get_host_info_by_strategy( self, hosts: Tuple[HostInfo, ...], role: HostRole, strategy: str, props: Optional[Properties]) -> HostInfo: host_selector: Optional[HostSelector] = self._accepted_strategies.get(strategy) if host_selector is not None: return host_selector.get_host(hosts, role, props) raise AwsWrapperError( Messages.get_formatted("DriverConnectionProvider.UnsupportedStrategy", strategy)) def connect( self, target_func: Callable, driver_dialect: DriverDialect, database_dialect: DatabaseDialect, host_info: HostInfo, props: Properties) -> Connection: prepared_properties = driver_dialect.prepare_connect_info(host_info, props) database_dialect.prepare_conn_props(prepared_properties) logger.debug("DriverConnectionProvider.ConnectingToHost", host_info.host, PropertiesUtils.log_properties(PropertiesUtils.mask_properties(prepared_properties))) return target_func(**prepared_properties) class ConnectionProviderManager: _lock: Lock = Lock() _conn_provider: Optional[ConnectionProvider] = None def __init__(self, default_provider: ConnectionProvider = DriverConnectionProvider()): self._default_provider: ConnectionProvider = default_provider @property def default_provider(self): return self._default_provider @staticmethod def set_connection_provider(connection_provider: ConnectionProvider): """ Setter that can optionally be called to request a non-default :py:class:`ConnectionProvider`. The requested :py:class:`ConnectionProvider` will be used to establish future connections unless it does not support a requested host, in which case the default :py:class:`ConnectionProvider` will be used. See :py:method:`ConnectionProvider.accepts_host_info` for more details. :param connection_provider: the :py:class:`ConnectionProvider` to use to establish new connections. """ with ConnectionProviderManager._lock: ConnectionProviderManager._conn_provider = connection_provider def get_connection_provider(self, host_info: HostInfo, props: Properties) -> ConnectionProvider: """ Get the :py:class:`ConnectionProvider` to use to establish a connection using the given host details and properties. If a non-default :py:class:`ConnectionProvider` has been set using :py:method:`ConnectionProvider.set_connection_provider` and :py:method:`ConnectionProvider.accepts_host_info` returns `True`, the non-default :py:class:`ConnectionProvider` will be returned. Otherwise, the default :py:class:`ConnectionProvider` will be returned. :param host_info: the host info for the connection that will be established. :param props: the connection properties. :return: the :py:class:`ConnectionProvider` to use to establish a connection using the given host details and properties. """ if ConnectionProviderManager._conn_provider is None: return self.default_provider with ConnectionProviderManager._lock: if ConnectionProviderManager._conn_provider is not None \ and ConnectionProviderManager._conn_provider.accepts_host_info(host_info, props): return ConnectionProviderManager._conn_provider return self._default_provider def accepts_strategy(self, role: HostRole, strategy: str) -> bool: """ Indicates whether the given selection strategy is supported by the connection provider. :param role: determines if the connection provider should return a reader host or a writer host. :param strategy: the host selection strategy to use. :return: whether the host selection strategy is supported. """ accepts_strategy: bool = False if ConnectionProviderManager._conn_provider is not None: with ConnectionProviderManager._lock: accepts_strategy = ConnectionProviderManager._conn_provider.accepts_strategy(role, strategy) if not accepts_strategy: accepts_strategy = self._default_provider.accepts_strategy(role, strategy) return accepts_strategy def get_host_info_by_strategy( self, hosts: Tuple[HostInfo, ...], role: HostRole, strategy: str, props: Optional[Properties]) -> HostInfo: """ Return a reader or a writer host using the specified strategy. This method should raise an :py:class:`AwsWrapperError` if the specified strategy is unsupported. :param hosts: the list of hosts to select from. :param role: determines if the connection provider should return a reader or a writer host. :param strategy: the host selection strategy to use. :param props: the connection properties. :return: the host selected using the specified strategy. """ if ConnectionProviderManager._conn_provider is not None: with ConnectionProviderManager._lock: if ConnectionProviderManager._conn_provider is not None \ and ConnectionProviderManager._conn_provider.accepts_strategy(role, strategy): return ConnectionProviderManager._conn_provider.get_host_info_by_strategy( hosts, role, strategy, props) return self._default_provider.get_host_info_by_strategy(hosts, role, strategy, props) @staticmethod def reset_provider(): """ Clears the non-default :py:class:`ConnectionProvider` if it has been set. """ if ConnectionProviderManager._conn_provider is not None: with ConnectionProviderManager._lock: ConnectionProviderManager._conn_provider = None @staticmethod def release_resources(): """ Releases any resources held by the available :py:class:`ConnectionProvider` instances. :return: """ if ConnectionProviderManager._conn_provider is not None: with ConnectionProviderManager._lock: if isinstance(ConnectionProviderManager._conn_provider, CanReleaseResources): ConnectionProviderManager._conn_provider.release_resources()