in aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py [0:0]
def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host: Optional[FrozenSet[str]] = None):
"""
Invalidates all opened connections pointing to the same host in a daemon thread.
:param host_info: the :py:class:`HostInfo` object containing the URL of the host.
:param host: the set of aliases representing a specific host.
"""
if host_info:
self.invalidate_all_connections(host=frozenset(host_info.as_alias()))
self.invalidate_all_connections(host=host_info.as_aliases())
return
instance_endpoint: Optional[str] = None
if host is None:
return
for instance in host:
if instance is not None and self._rds_utils.is_rds_instance(self._rds_utils.remove_port(instance)):
instance_endpoint = instance
break
if not instance_endpoint:
return
connection_set: Optional[WeakSet] = self._opened_connections.get(instance_endpoint)
if connection_set is not None:
self._log_connection_set(instance_endpoint, connection_set)
self._invalidate_connections(connection_set)