in datahub/client/consumer/shard_reader.py [0:0]
def __read_next(self, timeout):
timer = Timer(max(timeout, Constant.MIN_TIMEOUT_WAIT_FETCH))
with self._read_lock:
record = None
while not self._closed and record is None and not timer.is_expired():
if self._remain_records.value > 0:
complete_fetch = self._cache_record_queue.get()
self._remain_records.get_and_set(self._remain_records.value-1)
if complete_fetch.complete_type == CompleteType.T_NORMAL:
return complete_fetch.records
elif complete_fetch.complete_type == CompleteType.T_EXCEPTION:
raise complete_fetch.exception
else:
try:
complete_fetch.timer.wait_expire()
except Exception as e:
raise e
else:
self._message_reader.send_task(self.__gen_next_fetch_task, self.__deal_with_task)
with self._fetch_lock:
try:
self._fetch_lock.wait_for(self.__not_empty, timer.deadline_time-Timer.get_curr_time())
except Exception as e:
raise e
return record