aws_advanced_python_wrapper/states/session_state_service.py (145 lines of code) (raw):

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations from typing import TYPE_CHECKING, Callable, Optional, Protocol if TYPE_CHECKING: from aws_advanced_python_wrapper.plugin_service import PluginService from aws_advanced_python_wrapper.pep249 import Connection from aws_advanced_python_wrapper.errors import AwsWrapperError from aws_advanced_python_wrapper.states.session_state import SessionState from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) logger = Logger(__name__) class SessionStateService(Protocol): def get_autocommit(self) -> Optional[bool]: ... def set_autocommit(self, autocommit: bool): ... def setup_pristine_autocommit(self, autocommit: Optional[bool] = None): ... def get_readonly(self): ... def set_read_only(self, readonly: bool): ... def setup_pristine_readonly(self, readonly: Optional[bool] = None): ... def reset(self): ... def begin(self): """ Begin session transfer process """ ... def complete(self): """ Complete session transfer process. This method should be called despite whether session transfer is successful or not. """ ... def apply_current_session_state(self, new_connection: Connection): ... def apply_pristine_session_state(self, new_connection: Connection): ... class SessionStateTransferHandlers: reset_session_state_on_close_callable: Optional[Callable] = None transfer_session_state_on_switch_callable: Optional[Callable] = None @staticmethod def set_reset_session_state_on_close_func(func: Callable): SessionStateTransferHandlers.reset_session_state_on_close_callable = func @staticmethod def clear_reset_session_state_on_close_func(): SessionStateTransferHandlers.reset_session_state_on_close_callable = None @staticmethod def get_reset_session_state_on_close_func() -> Optional[Callable]: return SessionStateTransferHandlers.reset_session_state_on_close_callable @staticmethod def get_transfer_session_state_on_switch_func() -> Optional[Callable]: return SessionStateTransferHandlers.transfer_session_state_on_switch_callable @staticmethod def set_transfer_session_state_on_switch_func(func: Callable): SessionStateTransferHandlers.transfer_session_state_on_switch_callable = func @staticmethod def reset_transfer_session_state_on_switch_func(): SessionStateTransferHandlers.reset_transfer_session_state_on_switch_callable = None class SessionStateServiceImpl(SessionStateService): def __init__(self, plugin_service: PluginService, props: Properties): self._session_state: SessionState = SessionState() self._copy_session_state: Optional[SessionState] = None self._plugin_service: PluginService = plugin_service self._props: Properties = props def log_current_state(self): logger.debug(f"Current session state: \n{self._session_state}") def _transfer_state_enabled_setting(self) -> bool: return WrapperProperties.TRANSFER_SESSION_STATE_ON_SWITCH.get_bool(self._props) def _reset_state_enabled_setting(self) -> bool: return WrapperProperties.RESET_SESSION_STATE_ON_CLOSE.get_bool(self._props) def get_autocommit(self) -> Optional[bool]: return self._session_state.auto_commit.value def set_autocommit(self, autocommit: bool): if not self._transfer_state_enabled_setting(): return self._session_state.auto_commit.value = autocommit def setup_pristine_autocommit(self, autocommit: Optional[bool] = None): if not self._transfer_state_enabled_setting(): return if self._session_state.auto_commit.pristine_value is not None: return if autocommit is None and self._plugin_service.current_connection is not None: autocommit = self._plugin_service.driver_dialect.get_autocommit(self._plugin_service.current_connection) self._session_state.auto_commit.pristine_value = autocommit self.log_current_state() def get_readonly(self): return self._session_state.readonly.value def set_read_only(self, readonly: bool): if not self._transfer_state_enabled_setting(): return self._session_state.readonly.value = readonly def setup_pristine_readonly(self, readonly: Optional[bool] = None): if not self._transfer_state_enabled_setting(): return if self._session_state.readonly.pristine_value is not None: return if readonly is None and self._plugin_service.current_connection is not None: readonly = self._plugin_service.driver_dialect.is_read_only(self._plugin_service.current_connection) self._session_state.readonly.pristine_value = readonly self.log_current_state() def reset(self): self._session_state.auto_commit.reset() self._session_state.readonly.reset() def begin(self): self.log_current_state() if not self._transfer_state_enabled_setting() and not self._reset_state_enabled_setting(): return if self._copy_session_state is not None: raise AwsWrapperError("Previous session state transfer is not completed.") self._copy_session_state = self._session_state.copy() def complete(self): self._copy_session_state = None def apply_current_session_state(self, new_connection: Connection): if not self._transfer_state_enabled_setting(): return func: Optional[Callable] = SessionStateTransferHandlers.get_transfer_session_state_on_switch_func() if func is not None: is_handled: bool = func(self._session_state, new_connection) if is_handled: # Custom function has handled session transfer return if self._session_state.auto_commit.value is not None: self._session_state.auto_commit.reset_pristine_value() self.setup_pristine_autocommit() self._plugin_service.driver_dialect.set_autocommit(new_connection, self._session_state.auto_commit.value) if self._session_state.readonly.value is not None: self._session_state.readonly.reset_pristine_value() self.setup_pristine_readonly() self._plugin_service.driver_dialect.set_read_only(new_connection, self._session_state.readonly.value) def apply_pristine_session_state(self, new_connection: Connection): if not self._transfer_state_enabled_setting(): return func: Optional[Callable] = SessionStateTransferHandlers.get_transfer_session_state_on_switch_func() if func is not None: is_handled: bool = func(self._session_state, new_connection) if is_handled: # Custom function has handled session transfer return if self._copy_session_state is None: return if self._copy_session_state.auto_commit.can_restore_pristine(): try: self._plugin_service.driver_dialect.set_autocommit(new_connection, self._copy_session_state.auto_commit.pristine_value) except Exception: # Ignore any exception. pass if self._copy_session_state.readonly.can_restore_pristine(): try: self._plugin_service.driver_dialect.set_read_only(new_connection, self._copy_session_state.readonly.pristine_value) except Exception: # Ignore any exception. pass