in elastic_transport/_async_transport.py [0:0]
def _should_sniff(self, is_initial_sniff: bool) -> bool:
"""Decide if we should sniff or not. _async_init() must be called
before using this function.The async implementation doesn't have a lock.
"""
if is_initial_sniff:
return True
# Only start a new sniff if the previous run is completed.
if self._sniffing_task:
if not self._sniffing_task.done():
return False
# If there was a previous run we collect the sniffing task's
# result as it could have failed with an exception.
self._sniffing_task.result()
return (
self._loop.time() - self._last_sniffed_at
>= self._min_delay_between_sniffing
)