aws_advanced_python_wrapper/database_dialect.py (545 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import (TYPE_CHECKING, Callable, ClassVar, Dict, Optional,
Protocol, Tuple, runtime_checkable)
from aws_advanced_python_wrapper.driver_info import DriverInfo
if TYPE_CHECKING:
from aws_advanced_python_wrapper.pep249 import Connection
from .driver_dialect import DriverDialect
from .exception_handling import ExceptionHandler
from abc import abstractmethod
from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError
from contextlib import closing
from enum import Enum, auto
from aws_advanced_python_wrapper.errors import (AwsWrapperError,
QueryTimeoutError)
from aws_advanced_python_wrapper.host_list_provider import (
ConnectionStringHostListProvider, MultiAzHostListProvider,
RdsHostListProvider)
from aws_advanced_python_wrapper.hostinfo import HostInfo
from aws_advanced_python_wrapper.utils.decorators import \
preserve_transaction_status_with_timeout
from aws_advanced_python_wrapper.utils.log import Logger
from aws_advanced_python_wrapper.utils.properties import (Properties,
PropertiesUtils,
WrapperProperties)
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
from .driver_dialect_codes import DriverDialectCodes
from .utils.cache_map import CacheMap
from .utils.messages import Messages
from .utils.utils import Utils
logger = Logger(__name__)
class DialectCode(Enum):
# https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/multi-az-db-clusters-concepts.html
MULTI_AZ_MYSQL = "multi-az-mysql"
AURORA_MYSQL = "aurora-mysql"
RDS_MYSQL = "rds-mysql"
MYSQL = "mysql"
MULTI_AZ_PG = "multi-az-pg"
AURORA_PG = "aurora-pg"
RDS_PG = "rds-pg"
PG = "pg"
CUSTOM = "custom"
UNKNOWN = "unknown"
@staticmethod
def from_string(value: str) -> DialectCode:
try:
return DialectCode(value)
except ValueError:
raise AwsWrapperError(Messages.get_formatted("DialectCode.InvalidStringValue", value))
class TargetDriverType(Enum):
MYSQL = auto()
POSTGRES = auto()
CUSTOM = auto()
@runtime_checkable
class TopologyAwareDatabaseDialect(Protocol):
_TOPOLOGY_QUERY: str
_HOST_ID_QUERY: str
_IS_READER_QUERY: str
@property
def topology_query(self) -> str:
return self._TOPOLOGY_QUERY
@property
def host_id_query(self) -> str:
return self._HOST_ID_QUERY
@property
def is_reader_query(self) -> str:
return self._IS_READER_QUERY
class DatabaseDialect(Protocol):
"""
Database dialects help the AWS Advanced Python Driver determine what kind of underlying database is being used,
and configure details unique to specific databases.
"""
@property
@abstractmethod
def default_port(self) -> int:
...
@property
@abstractmethod
def host_alias_query(self) -> str:
...
@property
@abstractmethod
def server_version_query(self) -> str:
...
@property
@abstractmethod
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
...
@property
@abstractmethod
def exception_handler(self) -> Optional[ExceptionHandler]:
...
@abstractmethod
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
...
@abstractmethod
def get_host_list_provider_supplier(self) -> Callable:
...
@abstractmethod
def prepare_conn_props(self, props: Properties):
...
class DatabaseDialectProvider(Protocol):
def get_dialect(self, driver_dialect: str, props: Properties) -> Optional[DatabaseDialect]:
"""
Returns the dialect identified by analyzing the AwsWrapperProperties.DIALECT property (if set) or the target
driver method
"""
...
def query_for_dialect(self, url: str, host_info: Optional[HostInfo], conn: Connection,
driver_dialect: DriverDialect) -> Optional[DatabaseDialect]:
"""Returns the dialect identified by querying the database to identify the engine type"""
...
class MysqlDatabaseDialect(DatabaseDialect):
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (
DialectCode.AURORA_MYSQL, DialectCode.MULTI_AZ_MYSQL, DialectCode.RDS_MYSQL)
_exception_handler: Optional[ExceptionHandler] = None
@property
def default_port(self) -> int:
return 3306
@property
def host_alias_query(self) -> str:
return "SELECT CONCAT(@@hostname, ':', @@port)"
@property
def server_version_query(self) -> str:
return "SHOW VARIABLES LIKE 'version_comment'"
@property
def exception_handler(self) -> Optional[ExceptionHandler]:
if MysqlDatabaseDialect._exception_handler is None:
MysqlDatabaseDialect._exception_handler = Utils.initialize_class(
"aws_advanced_python_wrapper.utils.mysql_exception_handler.MySQLExceptionHandler")
return MysqlDatabaseDialect._exception_handler
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return MysqlDatabaseDialect._DIALECT_UPDATE_CANDIDATES
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute(self.server_version_query)
for record in cursor:
for column_value in record:
if "mysql" in column_value.lower():
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
def prepare_conn_props(self, props: Properties):
pass
class PgDatabaseDialect(DatabaseDialect):
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (
DialectCode.AURORA_PG, DialectCode.MULTI_AZ_PG, DialectCode.RDS_PG)
_exception_handler: Optional[ExceptionHandler] = None
@property
def default_port(self) -> int:
return 5432
@property
def host_alias_query(self) -> str:
return "SELECT CONCAT(inet_server_addr(), ':', inet_server_port())"
@property
def server_version_query(self) -> str:
return "SELECT 'version', VERSION()"
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return PgDatabaseDialect._DIALECT_UPDATE_CANDIDATES
@property
def exception_handler(self) -> Optional[ExceptionHandler]:
if PgDatabaseDialect._exception_handler is None:
PgDatabaseDialect._exception_handler = Utils.initialize_class(
"aws_advanced_python_wrapper.utils.pg_exception_handler.SingleAzPgExceptionHandler")
return PgDatabaseDialect._exception_handler
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute('SELECT 1 FROM pg_proc LIMIT 1')
if cursor.fetchone() is not None:
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
def prepare_conn_props(self, props: Properties):
pass
class RdsMysqlDialect(MysqlDatabaseDialect):
_DIALECT_UPDATE_CANDIDATES = (DialectCode.AURORA_MYSQL, DialectCode.MULTI_AZ_MYSQL)
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute(self.server_version_query)
for record in cursor:
for column_value in record:
if "source distribution" in column_value.lower():
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return RdsMysqlDialect._DIALECT_UPDATE_CANDIDATES
class RdsPgDialect(PgDatabaseDialect):
_EXTENSIONS_QUERY = ("SELECT (setting LIKE '%rds_tools%') AS rds_tools, "
"(setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils "
"FROM pg_settings "
"WHERE name='rds.extensions'")
_DIALECT_UPDATE_CANDIDATES = (DialectCode.AURORA_PG, DialectCode.MULTI_AZ_PG)
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
if not super().is_dialect(conn, driver_dialect):
return False
try:
with closing(conn.cursor()) as cursor:
cursor.execute(RdsPgDialect._EXTENSIONS_QUERY)
for row in cursor:
rds_tools = bool(row[0])
aurora_utils = bool(row[1])
logger.debug(
"RdsPgDialect.RdsToolsAuroraUtils", str(rds_tools), str(aurora_utils))
if rds_tools and not aurora_utils:
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return RdsPgDialect._DIALECT_UPDATE_CANDIDATES
class AuroraMysqlDialect(MysqlDatabaseDialect, TopologyAwareDatabaseDialect):
_DIALECT_UPDATE_CANDIDATES = (DialectCode.MULTI_AZ_MYSQL,)
_TOPOLOGY_QUERY = ("SELECT SERVER_ID, CASE WHEN SESSION_ID = 'MASTER_SESSION_ID' THEN TRUE ELSE FALSE END, "
"CPU, REPLICA_LAG_IN_MILLISECONDS, LAST_UPDATE_TIMESTAMP "
"FROM information_schema.replica_host_status "
"WHERE time_to_sec(timediff(now(), LAST_UPDATE_TIMESTAMP)) <= 300 "
"OR SESSION_ID = 'MASTER_SESSION_ID' ")
_HOST_ID_QUERY = "SELECT @@aurora_server_id"
_IS_READER_QUERY = "SELECT @@innodb_read_only"
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return AuroraMysqlDialect._DIALECT_UPDATE_CANDIDATES
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute("SHOW VARIABLES LIKE 'aurora_version'")
# If variable with such a name is presented then it means it's an Aurora cluster
if cursor.fetchone() is not None:
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: RdsHostListProvider(provider_service, props)
class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (DialectCode.MULTI_AZ_PG,)
_EXTENSIONS_QUERY = "SELECT (setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils " \
"FROM pg_settings WHERE name='rds.extensions'"
_HAS_TOPOLOGY_QUERY = "SELECT 1 FROM aurora_replica_status() LIMIT 1"
_TOPOLOGY_QUERY = \
("SELECT SERVER_ID, CASE WHEN SESSION_ID = 'MASTER_SESSION_ID' THEN TRUE ELSE FALSE END, "
"CPU, COALESCE(REPLICA_LAG_IN_MSEC, 0), LAST_UPDATE_TIMESTAMP "
"FROM aurora_replica_status() "
"WHERE EXTRACT(EPOCH FROM(NOW() - LAST_UPDATE_TIMESTAMP)) <= 300 OR SESSION_ID = 'MASTER_SESSION_ID' "
"OR LAST_UPDATE_TIMESTAMP IS NULL")
_HOST_ID_QUERY = "SELECT aurora_db_instance_identifier()"
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return AuroraPgDialect._DIALECT_UPDATE_CANDIDATES
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
if not super().is_dialect(conn, driver_dialect):
return False
has_extensions: bool = False
has_topology: bool = False
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute(self._EXTENSIONS_QUERY)
row = cursor.fetchone()
if row and bool(row[0]):
logger.debug("AuroraPgDialect.HasExtensionsTrue")
has_extensions = True
with closing(conn.cursor()) as cursor:
cursor.execute(self._HAS_TOPOLOGY_QUERY)
if cursor.fetchone() is not None:
logger.debug("AuroraPgDialect.HasTopologyTrue")
has_topology = True
return has_extensions and has_topology
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: RdsHostListProvider(provider_service, props)
class MultiAzMysqlDialect(MysqlDatabaseDialect, TopologyAwareDatabaseDialect):
_TOPOLOGY_QUERY = "SELECT id, endpoint, port FROM mysql.rds_topology"
_WRITER_HOST_QUERY = "SHOW REPLICA STATUS"
_WRITER_HOST_COLUMN_INDEX = 39
_HOST_ID_QUERY = "SELECT @@server_id"
_IS_READER_QUERY = "SELECT @@read_only"
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return None
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute(MultiAzMysqlDialect._TOPOLOGY_QUERY)
records = cursor.fetchall()
if records is not None and len(records) > 0:
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: MultiAzHostListProvider(
provider_service,
props,
self._TOPOLOGY_QUERY,
self._HOST_ID_QUERY,
self._IS_READER_QUERY,
self._WRITER_HOST_QUERY,
self._WRITER_HOST_COLUMN_INDEX)
def prepare_conn_props(self, props: Properties):
# These props are added for RDS metrics purposes, they are not required for functional correctness.
# The "conn_attrs" property value is specified as a dict.
extra_conn_attrs = {
"python_wrapper_name": "aws_python_driver",
"python_wrapper_version": DriverInfo.DRIVER_VERSION}
conn_attrs = props.get("conn_attrs")
if conn_attrs is None:
props["conn_attrs"] = extra_conn_attrs
else:
props["conn_attrs"].update(extra_conn_attrs)
class MultiAzPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
# The driver name passed to show_topology is used for RDS metrics purposes.
# It is not required for functional correctness.
_TOPOLOGY_QUERY = \
f"SELECT id, endpoint, port FROM rds_tools.show_topology('aws_python_driver-{DriverInfo.DRIVER_VERSION}')"
_WRITER_HOST_QUERY = \
"SELECT multi_az_db_cluster_source_dbi_resource_id FROM rds_tools.multi_az_db_cluster_source_dbi_resource_id()"
_HOST_ID_QUERY = "SELECT dbi_resource_id FROM rds_tools.dbi_resource_id()"
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
_exception_handler: Optional[ExceptionHandler] = None
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return None
@property
def exception_handler(self) -> Optional[ExceptionHandler]:
if MultiAzPgDialect._exception_handler is None:
MultiAzPgDialect._exception_handler = Utils.initialize_class(
"aws_advanced_python_wrapper.utils.pg_exception_handler.MultiAzPgExceptionHandler")
return MultiAzPgDialect._exception_handler
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute(MultiAzPgDialect._WRITER_HOST_QUERY)
if cursor.fetchone() is not None:
return True
except Exception:
if not initial_transaction_status and driver_dialect.is_in_transaction(conn):
conn.rollback()
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: MultiAzHostListProvider(
provider_service,
props,
self._TOPOLOGY_QUERY,
self._HOST_ID_QUERY,
self._IS_READER_QUERY,
self._WRITER_HOST_QUERY)
class UnknownDatabaseDialect(DatabaseDialect):
_DIALECT_UPDATE_CANDIDATES: Optional[Tuple[DialectCode, ...]] = \
(DialectCode.MYSQL,
DialectCode.PG,
DialectCode.RDS_MYSQL,
DialectCode.RDS_PG,
DialectCode.AURORA_MYSQL,
DialectCode.AURORA_PG,
DialectCode.MULTI_AZ_MYSQL,
DialectCode.MULTI_AZ_PG)
@property
def default_port(self) -> int:
return HostInfo.NO_PORT
@property
def host_alias_query(self) -> str:
return ""
@property
def server_version_query(self) -> str:
return ""
@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
return UnknownDatabaseDialect._DIALECT_UPDATE_CANDIDATES
@property
def exception_handler(self) -> Optional[ExceptionHandler]:
return None
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
return False
def get_host_list_provider_supplier(self) -> Callable:
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
def prepare_conn_props(self, props: Properties):
pass
class DatabaseDialectManager(DatabaseDialectProvider):
_ENDPOINT_CACHE_EXPIRATION_NS = 30 * 60_000_000_000 # 30 minutes
_known_endpoint_dialects: CacheMap[str, DialectCode] = CacheMap()
_custom_dialect: Optional[DatabaseDialect] = None
_executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="DatabaseDialectManagerExecutor")
_known_dialects_by_code: Dict[DialectCode, DatabaseDialect] = {
DialectCode.MYSQL: MysqlDatabaseDialect(),
DialectCode.RDS_MYSQL: RdsMysqlDialect(),
DialectCode.AURORA_MYSQL: AuroraMysqlDialect(),
DialectCode.MULTI_AZ_MYSQL: MultiAzMysqlDialect(),
DialectCode.PG: PgDatabaseDialect(),
DialectCode.RDS_PG: RdsPgDialect(),
DialectCode.AURORA_PG: AuroraPgDialect(),
DialectCode.MULTI_AZ_PG: MultiAzPgDialect(),
DialectCode.UNKNOWN: UnknownDatabaseDialect()
}
def __init__(self, props: Properties, rds_helper: Optional[RdsUtils] = None):
self._props: Properties = props
self._rds_helper: RdsUtils = rds_helper if rds_helper else RdsUtils()
self._can_update: bool = False
self._dialect: DatabaseDialect = UnknownDatabaseDialect()
self._dialect_code: DialectCode = DialectCode.UNKNOWN
@staticmethod
def get_custom_dialect():
return DatabaseDialectManager._custom_dialect
@staticmethod
def set_custom_dialect(dialect: DatabaseDialect):
DatabaseDialectManager._custom_dialect = dialect
@staticmethod
def reset_custom_dialect():
DatabaseDialectManager._custom_dialect = None
def reset_endpoint_cache(self):
DatabaseDialectManager._known_endpoint_dialects.clear()
def get_dialect(self, driver_dialect: str, props: Properties) -> DatabaseDialect:
self._can_update = False
if self._custom_dialect is not None:
self._dialect_code = DialectCode.CUSTOM
self._dialect = self._custom_dialect
self._log_current_dialect()
return self._dialect
user_dialect_setting: Optional[str] = WrapperProperties.DIALECT.get(props)
url = PropertiesUtils.get_url(props)
if user_dialect_setting is None:
dialect_code = DatabaseDialectManager._known_endpoint_dialects.get(url)
else:
dialect_code = DialectCode.from_string(user_dialect_setting)
if dialect_code is not None:
dialect: Optional[DatabaseDialect] = DatabaseDialectManager._known_dialects_by_code.get(dialect_code)
if dialect:
self._dialect_code = dialect_code
self._dialect = dialect
self._log_current_dialect()
return dialect
else:
raise AwsWrapperError(
Messages.get_formatted("DatabaseDialectManager.UnknownDialectCode", str(dialect_code)))
host: str = props["host"]
target_driver_type: TargetDriverType = self._get_target_driver_type(driver_dialect)
if target_driver_type is TargetDriverType.MYSQL:
rds_type = self._rds_helper.identify_rds_type(host)
if rds_type.is_rds_cluster:
self._can_update = True
self._dialect_code = DialectCode.AURORA_MYSQL
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.AURORA_MYSQL]
return self._dialect
if rds_type.is_rds:
self._can_update = True
self._dialect_code = DialectCode.RDS_MYSQL
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.RDS_MYSQL]
self._log_current_dialect()
return self._dialect
self._can_update = True
self._dialect_code = DialectCode.MYSQL
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.MYSQL]
self._log_current_dialect()
return self._dialect
if target_driver_type is TargetDriverType.POSTGRES:
rds_type = self._rds_helper.identify_rds_type(host)
if rds_type.is_rds_cluster:
self._can_update = True
self._dialect_code = DialectCode.AURORA_PG
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.AURORA_PG]
return self._dialect
if rds_type.is_rds:
self._can_update = True
self._dialect_code = DialectCode.RDS_PG
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.RDS_PG]
self._log_current_dialect()
return self._dialect
self._can_update = True
self._dialect_code = DialectCode.PG
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.PG]
self._log_current_dialect()
return self._dialect
self._can_update = True
self._dialect_code = DialectCode.UNKNOWN
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.UNKNOWN]
self._log_current_dialect()
return self._dialect
def _get_target_driver_type(self, driver_dialect: str) -> TargetDriverType:
if driver_dialect == DriverDialectCodes.PSYCOPG:
return TargetDriverType.POSTGRES
if driver_dialect == DriverDialectCodes.MYSQL_CONNECTOR_PYTHON:
return TargetDriverType.MYSQL
return TargetDriverType.CUSTOM
def query_for_dialect(self, url: str, host_info: Optional[HostInfo], conn: Connection,
driver_dialect: DriverDialect) -> DatabaseDialect:
if not self._can_update:
self._log_current_dialect()
return self._dialect
dialect_candidates = self._dialect.dialect_update_candidates if self._dialect is not None else None
if dialect_candidates is not None:
for dialect_code in dialect_candidates:
dialect_candidate = DatabaseDialectManager._known_dialects_by_code.get(dialect_code)
if dialect_candidate is None:
raise AwsWrapperError(Messages.get_formatted("DatabaseDialectManager.UnknownDialectCode", dialect_code))
timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get(self._props)
try:
cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout(
DatabaseDialectManager._executor,
timeout_sec,
driver_dialect,
conn)(dialect_candidate.is_dialect)
is_dialect = cursor_execute_func_with_timeout(conn, driver_dialect)
except TimeoutError as e:
raise QueryTimeoutError("DatabaseDialectManager.QueryForDialectTimeout") from e
if not is_dialect:
continue
self._can_update = False
self._dialect_code = dialect_code
self._dialect = dialect_candidate
DatabaseDialectManager._known_endpoint_dialects.put(url, dialect_code,
DatabaseDialectManager._ENDPOINT_CACHE_EXPIRATION_NS)
if host_info is not None:
DatabaseDialectManager._known_endpoint_dialects.put(
host_info.url, dialect_code, DatabaseDialectManager._ENDPOINT_CACHE_EXPIRATION_NS)
self._log_current_dialect()
return self._dialect
if self._dialect_code is None or self._dialect_code == DialectCode.UNKNOWN:
raise AwsWrapperError(Messages.get("DatabaseDialectManager.UnknownDialect"))
self._can_update = False
DatabaseDialectManager._known_endpoint_dialects.put(url, self._dialect_code,
DatabaseDialectManager._ENDPOINT_CACHE_EXPIRATION_NS)
if host_info is not None:
DatabaseDialectManager._known_endpoint_dialects.put(
host_info.url, self._dialect_code, DatabaseDialectManager._ENDPOINT_CACHE_EXPIRATION_NS)
self._log_current_dialect()
return self._dialect
def _log_current_dialect(self):
dialect_class = "<null>" if self._dialect is None else type(self._dialect).__name__
logger.debug("DatabaseDialectManager.CurrentDialectCanUpdate", self._dialect_code, dialect_class, self._can_update)