in core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala [104:134]
private def protectPartition[K, V](
consumer: ActorRef,
tp: TopicPartition,
previouslyCommitted: OffsetAndMetadata,
partitionRecords: util.List[ConsumerRecord[K, V]])
: Option[(TopicPartition, util.List[ConsumerRecord[K, V]])] = {
val threshold = new RecordThreshold(previouslyCommitted.offset(), progress.receivedMessages.get(tp))
if (threshold.recordsExceedThreshold(threshold, partitionRecords)) {
// requested and committed are assumed to be kept in-sync, so this _should_ be safe. Fails
// catastrophically if this is not the case
val committed = progress.committedOffsets(tp)
val requestVersusCommitted = previouslyCommitted.offset() - committed.offset()
if (resetProtection.offsetThreshold < Long.MaxValue &&
requestVersusCommitted > resetProtection.offsetThreshold) {
log.warning(
s"Your last commit request $previouslyCommitted is more than the configured threshold from the last" +
s"committed offset ($committed) for $tp. See " +
"https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
}
log.warning(
s"Dropping offsets for partition $tp - received an offset which is less than allowed $threshold " +
s"from the last requested offset (threshold: $threshold). Seeking to the latest known safe (committed " +
s"or assigned) offset: $committed. See " +
"https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
"for more information.")
consumer ! Seek(Map(tp -> committed.offset()))
None
} else {
Some((tp, partitionRecords))
}
}