def sniff()

in elastic_transport/_transport.py [0:0]


    def sniff(self, is_initial_sniff: bool = False) -> None:
        previously_sniffed_at = self._last_sniffed_at
        should_sniff = self._should_sniff(is_initial_sniff)
        try:
            if should_sniff:
                _logger.info("Started sniffing for additional nodes")
                self._last_sniffed_at = time.time()

                options = SniffOptions(
                    is_initial_sniff=is_initial_sniff, sniff_timeout=self._sniff_timeout
                )
                assert self._sniff_callback is not None
                node_configs = self._sniff_callback(self, options)
                if not node_configs and is_initial_sniff:
                    raise SniffingError(
                        "No viable nodes were discovered on the initial sniff attempt"
                    )

                prev_node_pool_size = len(self.node_pool)
                for node_config in node_configs:
                    self.node_pool.add(node_config)

                # Do some math to log which nodes are new/existing
                sniffed_nodes = len(node_configs)
                new_nodes = sniffed_nodes - (len(self.node_pool) - prev_node_pool_size)
                existing_nodes = sniffed_nodes - new_nodes
                _logger.debug(
                    "Discovered %d nodes during sniffing (%d new nodes, %d already in pool)",
                    sniffed_nodes,
                    new_nodes,
                    existing_nodes,
                )

        # If sniffing failed for any reason we
        # want to allow retrying immediately.
        except Exception as e:
            _logger.warning("Encountered an error during sniffing", exc_info=e)
            self._last_sniffed_at = previously_sniffed_at
            raise

        # If we started a sniff we need to release the lock.
        finally:
            if should_sniff:
                self._sniffing_lock.release()