in datahub/client/consumer/shard_group_reader.py [0:0]
def read(self, shard_id, time_out):
if self._closed:
self._logger.warning("ShardGroupReader closed when read. key: {}".format(self._coordinator.uniq_key))
raise DatahubException("ShardGroupReader closed when read")
record = None
timer = Timer(time_out)
while not self._closed and record is None and not timer.is_expired():
if self._coordinator.waiting_shard_assign():
timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
else:
self._coordinator.update_shard_info()
with self._lock:
reader = self.__get_next_reader(shard_id)
if reader is None:
timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
else:
record = self.__read_by_reader(reader)
return record