in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/ConsumerIterator.java [56:86]
public ConsumerRecord makeNext() {
Iterator<ConsumerRecord> localCurrent = current.get();
// if we don't have an iterator, get one
if (localCurrent == null || !localCurrent.hasNext() || consumedEndBounded()) {
FetchedDataChunk currentChuck;
try {
if (consumerTimeoutMs < 0) {
currentChuck = channel.take();
} else {
currentChuck = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS);
if (currentChuck == null) {
resetState();
throw new ConsumerTimeoutException();
}
}
} catch (InterruptedException e) {
LOGGER.error("Error poll from channel.", e);
resetState();
throw new RuntimeException(e.getMessage(), e);
}
localCurrent = currentChuck.consumerRecords().iterator();
currentPartitionInfo = currentChuck.partitionOffsetInfo();
current.set(localCurrent);
}
ConsumerRecord item = localCurrent.next();
while (item.offset() < currentPartitionInfo.consumeOffset() && localCurrent.hasNext()) {
item = localCurrent.next();
}
nextOffset = item.offset() + 1;
return item;
}