in streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java [1534:1684]
private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) {
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
final Optional<AutoOffsetResetStrategy> offsetResetStrategy = topologyMetadata.offsetResetStrategy(partition.topic());
// TODO
// This may be null if the task we are currently processing was part of a named topology that was just removed.
// After named topologies are removed, we can update `topologyMetadata.offsetResetStrateg()` so it
// will not return null any longer, and we can remove this check
if (offsetResetStrategy != null) {
if (offsetResetStrategy.isPresent()) {
final AutoOffsetResetStrategy resetPolicy = offsetResetStrategy.get();
if (resetPolicy == AutoOffsetResetStrategy.NONE) {
notReset.add(partition);
} else if (resetPolicy == AutoOffsetResetStrategy.EARLIEST) {
addToResetList(
partition,
seekToBeginning,
"Setting topic '{}' to consume from earliest offset",
loggedTopics
);
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
addToResetList(
partition,
seekToEnd,
"Setting topic '{}' to consume from latest offset",
loggedTopics
);
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
addToResetList(
partition,
seekByDuration,
resetPolicy.duration().get(),
"Setting topic '{}' to consume from by_duration:{}",
resetPolicy.duration().get().toString(),
loggedTopics
);
} else {
throw new IllegalStateException("Unknown reset policy " + resetPolicy);
}
} else {
final AutoOffsetResetStrategy resetPolicy = AutoOffsetResetStrategy.fromString(originalReset);
if (resetPolicy == AutoOffsetResetStrategy.NONE) {
notReset.add(partition);
} else if (resetPolicy == AutoOffsetResetStrategy.EARLIEST) {
addToResetList(
partition,
seekToBeginning,
"No custom setting defined for topic '{}' using original config 'earliest' for offset reset",
loggedTopics
);
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
addToResetList(
partition,
seekToEnd,
"No custom setting defined for topic '{}' using original config 'latest' for offset reset",
loggedTopics
);
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
addToResetList(
partition,
seekByDuration,
resetPolicy.duration().get(),
"No custom setting defined for topic '{}' using original config 'by_duration:{}' for offset reset",
resetPolicy.duration().get().toString(),
loggedTopics
);
} else {
throw new IllegalStateException("Unknown reset policy " + resetPolicy);
}
}
}
}
if (notReset.isEmpty()) {
if (!seekToBeginning.isEmpty()) {
mainConsumer.seekToBeginning(seekToBeginning);
}
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
}
if (!seekByDuration.isEmpty()) {
final long nowMs = time.milliseconds();
final Map<TopicPartition, Long> seekToTimestamps = seekByDuration.entrySet().stream()
.map(e -> {
long seekMs = nowMs - e.getValue().toMillis();
if (seekMs < 0L) {
log.debug("Cannot reset offset to negative timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs, e.getKey());
seekMs = 0L;
}
return Map.entry(e.getKey(), seekMs);
})
.collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), Map::putAll);
try {
for (final Map.Entry<TopicPartition, OffsetAndTimestamp> partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) {
final TopicPartition partition = partitionAndOffset.getKey();
final OffsetAndTimestamp seekOffset = partitionAndOffset.getValue();
if (seekOffset != null) {
mainConsumer.seek(partition, new OffsetAndMetadata(seekOffset.offset()));
} else {
log.debug(
"Cannot reset offset to non-existing timestamp {} (larger than timestamp of last record)" +
" for partition {}. Seeking to end instead.",
seekToTimestamps.get(partition),
partition
);
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
}
}
} catch (final TimeoutException timeoutException) {
taskManager.maybeInitTaskTimeoutsOrThrow(seekByDuration.keySet(), timeoutException, now);
log.debug(
String.format(
"Could not reset offset for %s due to the following exception; will retry.",
seekByDuration.keySet()),
timeoutException
);
}
}
} else {
final String notResetString =
notReset.stream()
.map(TopicPartition::topic)
.distinct()
.collect(Collectors.joining(","));
final String format = String.format(
"No valid committed offset found for input [%s] and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or " +
"StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))",
notResetString
);
if (cause == null) {
throw new StreamsException(format);
} else {
throw new StreamsException(format, cause);
}
}
}