def fetch_one_sampled_message()

in graphlearn_torch/python/distributed/dist_server.py [0:0]


  def fetch_one_sampled_message(self, producer_id: int):
    r""" Fetch a sampled message from the buffer of a specific sampling
    producer with its producer id.
    """
    producer = self._producer_pool.get(producer_id, None)
    if producer is None:
      warnings.warn('invalid producer_id {producer_id}')
      return None, False
    if producer.is_all_sampling_completed_and_consumed():
      return None, True
    buffer = self._msg_buffer_pool.get(producer_id, None)
    while True:
        try:
            msg = buffer.recv(timeout_ms=500)
            return msg, False
        except QueueTimeoutError as e:
            if producer.is_all_sampling_completed():
                return None, True