public ConsumerRecord makeNext()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/ConsumerIterator.java [56:86]


  public ConsumerRecord makeNext() {
    Iterator<ConsumerRecord> localCurrent = current.get();
    // if we don't have an iterator, get one
    if (localCurrent == null || !localCurrent.hasNext() || consumedEndBounded()) {
      FetchedDataChunk currentChuck;
      try {
        if (consumerTimeoutMs < 0) {
          currentChuck = channel.take();
        } else {
          currentChuck = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS);
          if (currentChuck == null) {
            resetState();
            throw new ConsumerTimeoutException();
          }
        }
      } catch (InterruptedException e) {
        LOGGER.error("Error poll from channel.", e);
        resetState();
        throw new RuntimeException(e.getMessage(), e);
      }
      localCurrent = currentChuck.consumerRecords().iterator();
      currentPartitionInfo = currentChuck.partitionOffsetInfo();
      current.set(localCurrent);
    }
    ConsumerRecord item = localCurrent.next();
    while (item.offset() < currentPartitionInfo.consumeOffset() && localCurrent.hasNext()) {
      item = localCurrent.next();
    }
    nextOffset = item.offset() + 1;
    return item;
  }