def read()

in datahub/client/consumer/shard_group_reader.py [0:0]


    def read(self, shard_id, time_out):
        if self._closed:
            self._logger.warning("ShardGroupReader closed when read. key: {}".format(self._coordinator.uniq_key))
            raise DatahubException("ShardGroupReader closed when read")

        record = None
        timer = Timer(time_out)
        while not self._closed and record is None and not timer.is_expired():
            if self._coordinator.waiting_shard_assign():
                timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
            else:
                self._coordinator.update_shard_info()

                with self._lock:
                    reader = self.__get_next_reader(shard_id)
                if reader is None:
                    timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
                else:
                    record = self.__read_by_reader(reader)
        return record