public D readRecordImpl()

in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java [141:206]


  public D readRecordImpl(D reuse) throws DataRecordException, IOException {
    if (this.shutdownRequested.get()) {
      return null;
    }

    this.readStartTime = System.nanoTime();

    while (!allPartitionsFinished()) {
      if (currentPartitionFinished()) {
        moveToNextPartition();
        continue;
      }
      if (this.messageIterator == null || !this.messageIterator.hasNext()) {
        try {
          long fetchStartTime = System.nanoTime();
          this.messageIterator = fetchNextMessageBuffer();
          this.statsTracker.onFetchNextMessageBuffer(this.currentPartitionIdx, fetchStartTime);
        } catch (Exception e) {
          LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
              getCurrentPartition()), e);
          moveToNextPartition();
          continue;
        }
        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
          moveToNextPartition();
          continue;
        }
      }
      while (!currentPartitionFinished()) {
        if (!this.messageIterator.hasNext()) {
          break;
        }

        KafkaConsumerRecord nextValidMessage = this.messageIterator.next();

        // Even though we ask Kafka to give us a message buffer starting from offset x, it may
        // return a buffer that starts from offset smaller than x, so we need to skip messages
        // until we get to x.
        if (nextValidMessage.getOffset() < this.nextWatermark.get(this.currentPartitionIdx)) {
          continue;
        }

        this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
        try {
          // track time for decode/convert depending on the record type
          long decodeStartTime = System.nanoTime();

          D record = decodeKafkaMessage(nextValidMessage);

          this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime,
              nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L,
              (this.recordCreationTimestampFieldName != null) ? nextValidMessage
                  .getRecordCreationTimestamp(this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
          this.currentPartitionLastSuccessfulRecord = record;
          return record;
        } catch (Throwable t) {
          statsTracker.onUndecodeableRecord(this.currentPartitionIdx);
          if (shouldLogError()) {
            LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
          }
        }
      }
    }
    LOG.info("Finished pulling topic " + this.topicName);
    return null;
  }