private def protectPartition[K, V]()

in core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala [104:134]


    private def protectPartition[K, V](
        consumer: ActorRef,
        tp: TopicPartition,
        previouslyCommitted: OffsetAndMetadata,
        partitionRecords: util.List[ConsumerRecord[K, V]])
        : Option[(TopicPartition, util.List[ConsumerRecord[K, V]])] = {
      val threshold = new RecordThreshold(previouslyCommitted.offset(), progress.receivedMessages.get(tp))
      if (threshold.recordsExceedThreshold(threshold, partitionRecords)) {
        // requested and committed are assumed to be kept in-sync, so this _should_ be safe. Fails
        // catastrophically if this is not the case
        val committed = progress.committedOffsets(tp)
        val requestVersusCommitted = previouslyCommitted.offset() - committed.offset()
        if (resetProtection.offsetThreshold < Long.MaxValue &&
          requestVersusCommitted > resetProtection.offsetThreshold) {
          log.warning(
            s"Your last commit request $previouslyCommitted is more than the configured threshold from the last" +
            s"committed offset ($committed) for $tp. See " +
            "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
        }
        log.warning(
          s"Dropping offsets for partition $tp - received an offset which is less than allowed $threshold " +
          s"from the  last requested offset (threshold: $threshold). Seeking to the latest known safe (committed " +
          s"or assigned) offset: $committed. See  " +
          "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
          "for more information.")
        consumer ! Seek(Map(tp -> committed.offset()))
        None
      } else {
        Some((tp, partitionRecords))
      }
    }