in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/OriginalTopicKafkaFetcher.java [74:103]
public SeekStartOffsetOption getSeekStartOffsetOption(
long specifiedOffset,
@Nullable Long earliestOffset,
@Nullable Long latestOffset,
AutoOffsetResetPolicy autoOffsetResetPolicy) {
boolean offsetOutOfRange = false;
// treat startOffset == null and endOffset == null as not offset of range
if ((earliestOffset != null && earliestOffset > specifiedOffset)
|| (latestOffset != null && latestOffset < specifiedOffset)) {
offsetOutOfRange = true;
}
if (offsetOutOfRange) {
switch (autoOffsetResetPolicy) {
case AUTO_OFFSET_RESET_POLICY_EARLIEST:
return SeekStartOffsetOption.SEEK_TO_EARLIEST_OFFSET;
case AUTO_OFFSET_RESET_POLICY_LATEST:
return SeekStartOffsetOption.SEEK_TO_LATEST_OFFSET;
default:
return SeekStartOffsetOption.SEEK_TO_SPECIFIED_OFFSET;
}
} else {
// consumer honors "auto.offset.reset" when offset out of range
//
// "auto.offset.reset" is the configuration used to create a Kafka consumer. It is set by the
// system manager.
// SeekStartOffsetOption is different, it's the runtime configuration and it is set by users.
// It can different from "auto.offset.reset", so we need to handle it differently.
return SeekStartOffsetOption.SEEK_TO_SPECIFIED_OFFSET;
}
}