in graphlearn_torch/python/distributed/dist_server.py [0:0]
def fetch_one_sampled_message(self, producer_id: int):
r""" Fetch a sampled message from the buffer of a specific sampling
producer with its producer id.
"""
producer = self._producer_pool.get(producer_id, None)
if producer is None:
warnings.warn('invalid producer_id {producer_id}')
return None, False
if producer.is_all_sampling_completed_and_consumed():
return None, True
buffer = self._msg_buffer_pool.get(producer_id, None)
while True:
try:
msg = buffer.recv(timeout_ms=500)
return msg, False
except QueueTimeoutError as e:
if producer.is_all_sampling_completed():
return None, True