def invalidate_all_connections()

in aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py [0:0]


    def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host: Optional[FrozenSet[str]] = None):
        """
        Invalidates all opened connections pointing to the same host in a daemon thread.

       :param host_info: the :py:class:`HostInfo` object containing the URL of the host.
       :param host: the set of aliases representing a specific host.
        """

        if host_info:
            self.invalidate_all_connections(host=frozenset(host_info.as_alias()))
            self.invalidate_all_connections(host=host_info.as_aliases())
            return

        instance_endpoint: Optional[str] = None
        if host is None:
            return

        for instance in host:
            if instance is not None and self._rds_utils.is_rds_instance(self._rds_utils.remove_port(instance)):
                instance_endpoint = instance
                break

        if not instance_endpoint:
            return

        connection_set: Optional[WeakSet] = self._opened_connections.get(instance_endpoint)
        if connection_set is not None:
            self._log_connection_set(instance_endpoint, connection_set)
            self._invalidate_connections(connection_set)