in dataflux_core/fast_list.py [0:0]
def wait_for_work(self) -> bool:
"""Indefinitely waits for available work and consumes it once available.
Returns:
Boolean value indicating that new work has been acquired. The function
will only return False in response to receiving a shutdown signal (None)
from the controller.
"""
self.send_work_stealing_needed_queue.put(self.name)
self.idle_queue.put(self.name)
logging.debug(f"Process {self.name} waiting for work...")
while True:
try:
self.heartbeat_queue.put(self.name)
new_range = self.direct_work_available_queue.get_nowait()
# None is pushed onto the queue as the shutdown signal once all work is finished.
if new_range[0] != None:
self.unidle_queue.put(self.name)
except queue.Empty:
time.sleep(0.1)
continue
break
if new_range[0] is None:
logging.debug(f"Process {self.name} didn't receive work")
# Upon receiving shutdown signal log all relevant metadata.
md = (self.name, self.api_call_count)
self.metadata_queue.put(md)
return False
self.start_range = new_range[0]
self.end_range = new_range[1]
logging.debug(f"Process {self.name} got new range [{self.start_range},"
f" {self.end_range}]")
return True