private WorkUnit getWorkUnitForTopicPartition()

in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java [544:676]


  private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceState state,
      Optional<State> topicSpecificState, Offsets offsets, boolean failedToGetKafkaOffsets) {

    long previousOffset = 0;
    long previousOffsetFetchEpochTime = 0;
    boolean previousOffsetNotFound = false;
    try {
      previousOffset = getPreviousOffsetForPartition(partition, state);
      offsets.setPreviousEndOffset(previousOffset);
      offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
      offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
      offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
      offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
      previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
      offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
    } catch (PreviousOffsetNotFoundException e) {
      previousOffsetNotFound = true;
    }

    if (failedToGetKafkaOffsets) {

      // Increment counts, which will be reported as job metrics
      this.failToGetOffsetCount.incrementAndGet();

      // When unable to get earliest/latest offsets from Kafka, skip the partition and create an empty workunit,
      // so that previousOffset is persisted.
      LOG.warn(String
          .format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
              partition));
      return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
          topicSpecificState);
    }

    if (shouldMoveToLatestOffset(partition, state)) {
      offsets.startAtLatestOffset();
    } else if (previousOffsetNotFound) {

      /**
       * When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
       * (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
       * (no need to create an empty workunit in this case since there's no offset to persist).
       * In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
       * lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
       * partition will be consumed from latest offset
       **/
      String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
      String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
      if (offsetOption.equals(LATEST_OFFSET)) {
        LOG.warn(offsetNotFoundMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
        offsets.startAtLatestOffset();
      } else if (offsetOption.equals(EARLIEST_OFFSET)) {
        LOG.warn(
            offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
        offsets.startAtEarliestOffset();
      } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
        long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
        long latestOffset = offsets.getLatestOffset();
        long offset = latestOffset - lookbackOffsetRange;
        LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ]  start offset: " + offset);
        try {
          offsets.startAt(offset);
        } catch (StartOffsetOutOfRangeException e) {
          // Increment counts, which will be reported as job metrics
          if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
            this.offsetTooEarlyCount.incrementAndGet();
          } else {
            this.offsetTooLateCount.incrementAndGet();
          }

          // When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
          // partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
          String offsetOutOfRangeMsg = String.format(
                  "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
                  partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
          offsetOption =
                  state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
          if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
                  && offsets.getStartOffset() >= offsets.getLatestOffset())) {
            LOG.warn(
                    offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
            offsets.startAtLatestOffset();
          } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
            LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
                    .getEarliestOffset());
            offsets.startAtEarliestOffset();
          } else {
            LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
            return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
          }
        }
      }
      else {
        LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
        return null;
      }
    } else {
      try {
        offsets.startAt(previousOffset);
      } catch (StartOffsetOutOfRangeException e) {

        // Increment counts, which will be reported as job metrics
        if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
          this.offsetTooEarlyCount.incrementAndGet();
        } else {
          this.offsetTooLateCount.incrementAndGet();
        }

        // When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the
        // partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
        String offsetOutOfRangeMsg = String.format(
            "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
            partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
        String offsetOption =
            state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
        if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
            && offsets.getStartOffset() >= offsets.getLatestOffset())) {
          LOG.warn(
              offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
          offsets.startAtLatestOffset();
        } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
          LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
              .getEarliestOffset());
          offsets.startAtEarliestOffset();
        } else {
          LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
          return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
        }
      }
    }
    WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
    addSourceStatePropsToWorkUnit(workUnit, state);
    return workUnit;
  }