in aws_advanced_python_wrapper/plugin_service.py [0:0]
def set_current_connection(self, connection: Optional[Connection], host_info: Optional[HostInfo]):
old_connection = self._current_connection
if self._current_connection is None:
self._current_connection = connection
self._current_host_info = host_info
self.session_state_service.reset()
self._container.plugin_manager.notify_connection_changed({ConnectionEvent.INITIAL_CONNECTION})
return
if connection is not None and old_connection is not None and old_connection != connection:
# Update an existing connection.
is_in_transaction = self._is_in_transaction
self.session_state_service.begin()
try:
self._current_connection = connection
self._current_host_info = host_info
self.session_state_service.apply_current_session_state(connection)
self.update_in_transaction(False)
if is_in_transaction and WrapperProperties.ROLLBACK_ON_SWITCH.get_bool(self.props):
try:
old_connection.rollback()
except Exception:
# Ignore any exception.
pass
old_connection_suggested_action = \
self._container.plugin_manager.notify_connection_changed({ConnectionEvent.CONNECTION_OBJECT_CHANGED})
if old_connection_suggested_action != OldConnectionSuggestedAction.PRESERVE and not self.driver_dialect.is_closed(old_connection):
try:
self.session_state_service.apply_pristine_session_state(old_connection)
except Exception:
# Ignore any exception.
pass
try:
old_connection.close()
except Exception:
# Ignore any exception.
pass
finally:
self.session_state_service.complete()