in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java [141:206]
public D readRecordImpl(D reuse) throws DataRecordException, IOException {
if (this.shutdownRequested.get()) {
return null;
}
this.readStartTime = System.nanoTime();
while (!allPartitionsFinished()) {
if (currentPartitionFinished()) {
moveToNextPartition();
continue;
}
if (this.messageIterator == null || !this.messageIterator.hasNext()) {
try {
long fetchStartTime = System.nanoTime();
this.messageIterator = fetchNextMessageBuffer();
this.statsTracker.onFetchNextMessageBuffer(this.currentPartitionIdx, fetchStartTime);
} catch (Exception e) {
LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
getCurrentPartition()), e);
moveToNextPartition();
continue;
}
if (this.messageIterator == null || !this.messageIterator.hasNext()) {
moveToNextPartition();
continue;
}
}
while (!currentPartitionFinished()) {
if (!this.messageIterator.hasNext()) {
break;
}
KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
// Even though we ask Kafka to give us a message buffer starting from offset x, it may
// return a buffer that starts from offset smaller than x, so we need to skip messages
// until we get to x.
if (nextValidMessage.getOffset() < this.nextWatermark.get(this.currentPartitionIdx)) {
continue;
}
this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
try {
// track time for decode/convert depending on the record type
long decodeStartTime = System.nanoTime();
D record = decodeKafkaMessage(nextValidMessage);
this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime,
nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L,
(this.recordCreationTimestampFieldName != null) ? nextValidMessage
.getRecordCreationTimestamp(this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
this.currentPartitionLastSuccessfulRecord = record;
return record;
} catch (Throwable t) {
statsTracker.onUndecodeableRecord(this.currentPartitionIdx);
if (shouldLogError()) {
LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
}
}
}
}
LOG.info("Finished pulling topic " + this.topicName);
return null;
}