def __read_next()

in datahub/client/consumer/shard_reader.py [0:0]


    def __read_next(self, timeout):
        timer = Timer(max(timeout, Constant.MIN_TIMEOUT_WAIT_FETCH))
        with self._read_lock:
            record = None
            while not self._closed and record is None and not timer.is_expired():
                if self._remain_records.value > 0:
                    complete_fetch = self._cache_record_queue.get()
                    self._remain_records.get_and_set(self._remain_records.value-1)
                    if complete_fetch.complete_type == CompleteType.T_NORMAL:
                        return complete_fetch.records
                    elif complete_fetch.complete_type == CompleteType.T_EXCEPTION:
                        raise complete_fetch.exception
                    else:
                        try:
                            complete_fetch.timer.wait_expire()
                        except Exception as e:
                            raise e
                else:
                    self._message_reader.send_task(self.__gen_next_fetch_task, self.__deal_with_task)
                    with self._fetch_lock:
                        try:
                            self._fetch_lock.wait_for(self.__not_empty, timer.deadline_time-Timer.get_curr_time())
                        except Exception as e:
                            raise e
            return record