public SeekStartOffsetOption getSeekStartOffsetOption()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/OriginalTopicKafkaFetcher.java [74:103]


  public SeekStartOffsetOption getSeekStartOffsetOption(
      long specifiedOffset,
      @Nullable Long earliestOffset,
      @Nullable Long latestOffset,
      AutoOffsetResetPolicy autoOffsetResetPolicy) {
    boolean offsetOutOfRange = false;
    // treat startOffset == null and endOffset == null as not offset of range
    if ((earliestOffset != null && earliestOffset > specifiedOffset)
        || (latestOffset != null && latestOffset < specifiedOffset)) {
      offsetOutOfRange = true;
    }
    if (offsetOutOfRange) {
      switch (autoOffsetResetPolicy) {
        case AUTO_OFFSET_RESET_POLICY_EARLIEST:
          return SeekStartOffsetOption.SEEK_TO_EARLIEST_OFFSET;
        case AUTO_OFFSET_RESET_POLICY_LATEST:
          return SeekStartOffsetOption.SEEK_TO_LATEST_OFFSET;
        default:
          return SeekStartOffsetOption.SEEK_TO_SPECIFIED_OFFSET;
      }
    } else {
      // consumer honors "auto.offset.reset" when offset out of range
      //
      // "auto.offset.reset" is the configuration used to create a Kafka consumer. It is set by the
      // system manager.
      // SeekStartOffsetOption is different, it's the runtime configuration and it is set by users.
      // It can different from  "auto.offset.reset", so we need to handle it differently.
      return SeekStartOffsetOption.SEEK_TO_SPECIFIED_OFFSET;
    }
  }