in elastic_transport/_transport.py [0:0]
def sniff(self, is_initial_sniff: bool = False) -> None:
previously_sniffed_at = self._last_sniffed_at
should_sniff = self._should_sniff(is_initial_sniff)
try:
if should_sniff:
_logger.info("Started sniffing for additional nodes")
self._last_sniffed_at = time.time()
options = SniffOptions(
is_initial_sniff=is_initial_sniff, sniff_timeout=self._sniff_timeout
)
assert self._sniff_callback is not None
node_configs = self._sniff_callback(self, options)
if not node_configs and is_initial_sniff:
raise SniffingError(
"No viable nodes were discovered on the initial sniff attempt"
)
prev_node_pool_size = len(self.node_pool)
for node_config in node_configs:
self.node_pool.add(node_config)
# Do some math to log which nodes are new/existing
sniffed_nodes = len(node_configs)
new_nodes = sniffed_nodes - (len(self.node_pool) - prev_node_pool_size)
existing_nodes = sniffed_nodes - new_nodes
_logger.debug(
"Discovered %d nodes during sniffing (%d new nodes, %d already in pool)",
sniffed_nodes,
new_nodes,
existing_nodes,
)
# If sniffing failed for any reason we
# want to allow retrying immediately.
except Exception as e:
_logger.warning("Encountered an error during sniffing", exc_info=e)
self._last_sniffed_at = previously_sniffed_at
raise
# If we started a sniff we need to release the lock.
finally:
if should_sniff:
self._sniffing_lock.release()