private void resetOffsets()

in streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java [1534:1684]


    private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) {
        final Set<String> loggedTopics = new HashSet<>();
        final Set<TopicPartition> seekToBeginning = new HashSet<>();
        final Set<TopicPartition> seekToEnd = new HashSet<>();
        final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
        final Set<TopicPartition> notReset = new HashSet<>();

        for (final TopicPartition partition : partitions) {
            final Optional<AutoOffsetResetStrategy> offsetResetStrategy = topologyMetadata.offsetResetStrategy(partition.topic());

            // TODO
            // This may be null if the task we are currently processing was part of a named topology that was just removed.
            // After named topologies are removed, we can update `topologyMetadata.offsetResetStrateg()` so it
            // will not return null any longer, and we can remove this check
            if (offsetResetStrategy != null) {
                if (offsetResetStrategy.isPresent()) {
                    final AutoOffsetResetStrategy resetPolicy = offsetResetStrategy.get();

                    if (resetPolicy == AutoOffsetResetStrategy.NONE) {
                        notReset.add(partition);
                    } else if (resetPolicy == AutoOffsetResetStrategy.EARLIEST) {
                        addToResetList(
                            partition,
                            seekToBeginning,
                            "Setting topic '{}' to consume from earliest offset",
                            loggedTopics
                        );
                    } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
                        addToResetList(
                            partition,
                            seekToEnd,
                            "Setting topic '{}' to consume from latest offset",
                            loggedTopics
                        );
                    } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
                        addToResetList(
                            partition,
                            seekByDuration,
                            resetPolicy.duration().get(),
                            "Setting topic '{}' to consume from by_duration:{}",
                            resetPolicy.duration().get().toString(),
                            loggedTopics
                        );
                    } else {
                        throw new IllegalStateException("Unknown reset policy " + resetPolicy);
                    }
                } else {
                    final AutoOffsetResetStrategy resetPolicy = AutoOffsetResetStrategy.fromString(originalReset);

                    if (resetPolicy == AutoOffsetResetStrategy.NONE) {
                        notReset.add(partition);
                    } else if (resetPolicy == AutoOffsetResetStrategy.EARLIEST) {
                        addToResetList(
                            partition,
                            seekToBeginning,
                            "No custom setting defined for topic '{}' using original config 'earliest' for offset reset",
                            loggedTopics
                        );
                    } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
                        addToResetList(
                            partition,
                            seekToEnd,
                            "No custom setting defined for topic '{}' using original config 'latest' for offset reset",
                            loggedTopics
                        );
                    } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
                        addToResetList(
                            partition,
                            seekByDuration,
                            resetPolicy.duration().get(),
                            "No custom setting defined for topic '{}' using original config 'by_duration:{}' for offset reset",
                            resetPolicy.duration().get().toString(),
                            loggedTopics
                        );
                    } else {
                        throw new IllegalStateException("Unknown reset policy " + resetPolicy);
                    }
                }
            }
        }

        if (notReset.isEmpty()) {
            if (!seekToBeginning.isEmpty()) {
                mainConsumer.seekToBeginning(seekToBeginning);
            }

            if (!seekToEnd.isEmpty()) {
                mainConsumer.seekToEnd(seekToEnd);
            }

            if (!seekByDuration.isEmpty()) {
                final long nowMs = time.milliseconds();
                final Map<TopicPartition, Long> seekToTimestamps = seekByDuration.entrySet().stream()
                    .map(e -> {
                        long seekMs = nowMs - e.getValue().toMillis();
                        if (seekMs < 0L) {
                            log.debug("Cannot reset offset to negative timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs, e.getKey());
                            seekMs = 0L;
                        }
                        return Map.entry(e.getKey(), seekMs);
                    })
                    .collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), Map::putAll);

                try {
                    for (final Map.Entry<TopicPartition, OffsetAndTimestamp> partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) {
                        final TopicPartition partition = partitionAndOffset.getKey();
                        final OffsetAndTimestamp seekOffset = partitionAndOffset.getValue();
                        if (seekOffset != null) {
                            mainConsumer.seek(partition, new OffsetAndMetadata(seekOffset.offset()));
                        } else {
                            log.debug(
                                "Cannot reset offset to non-existing timestamp {} (larger than timestamp of last record)" +
                                    " for partition {}. Seeking to end instead.",
                                seekToTimestamps.get(partition),
                                partition
                            );
                            mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
                        }
                    }
                } catch (final TimeoutException timeoutException) {
                    taskManager.maybeInitTaskTimeoutsOrThrow(seekByDuration.keySet(), timeoutException, now);
                    log.debug(
                        String.format(
                            "Could not reset offset for %s due to the following exception; will retry.",
                            seekByDuration.keySet()),
                        timeoutException
                    );
                }
            }
        } else {
            final String notResetString =
                notReset.stream()
                        .map(TopicPartition::topic)
                        .distinct()
                        .collect(Collectors.joining(","));

            final String format = String.format(
                "No valid committed offset found for input [%s] and no valid reset policy configured." +
                    " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
                    "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or " +
                    "StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))",
                notResetString
            );

            if (cause == null) {
                throw new StreamsException(format);
            } else {
                throw new StreamsException(format, cause);
            }
        }
    }