private def isAccepted[Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [622:756]


  private def isAccepted[Envelope](
      recordWithOffset: RecordWithOffset,
      currentInflight: Map[Pid, SeqNr]): Future[Boolean] = {
    val pid = recordWithOffset.record.pid
    val seqNr = recordWithOffset.record.seqNr
    val currentState = getState()

    val duplicate = isDuplicate(recordWithOffset.record)

    if (duplicate) {
      logger.trace("Filtering out duplicate sequence number [{}] for pid [{}]", seqNr, pid)
      FutureFalse
    } else if (recordWithOffset.strictSeqNr) {
      // strictSeqNr == true is for event sourced
      val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))

      def logUnexpected(): Unit = {
        if (viaPubSub(recordWithOffset.offset))
          logger.debug(
            "Rejecting pub-sub envelope, unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
            seqNr: java.lang.Long,
            pid,
            prevSeqNr: java.lang.Long,
            recordWithOffset.offset)
        else if (recordWithOffset.envelopeLoaded)
          logger.debug(
            "Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
            seqNr: java.lang.Long,
            pid,
            prevSeqNr: java.lang.Long,
            recordWithOffset.offset)
        else
          logger.warn(
            "Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
            seqNr: java.lang.Long,
            pid,
            prevSeqNr: java.lang.Long,
            recordWithOffset.offset)
      }

      def logUnknown(): Unit = {
        if (viaPubSub(recordWithOffset.offset)) {
          logger.debug(
            "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
            seqNr: java.lang.Long,
            pid,
            recordWithOffset.offset)
        } else if (recordWithOffset.envelopeLoaded) {
          // This may happen rather frequently when using `publish-events`, after reconnecting and such.
          logger.debug(
            "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
            seqNr: java.lang.Long,
            pid,
            recordWithOffset.offset)
        } else {
          logger.warn(
            "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
            seqNr: java.lang.Long,
            pid,
            recordWithOffset.offset)
        }
      }

      if (prevSeqNr > 0) {
        // expecting seqNr to be +1 of previously known
        val ok = seqNr == prevSeqNr + 1
        if (ok) {
          FutureTrue
        } else if (seqNr <= currentInflight.getOrElse(pid, 0L)) {
          // currentInFlight contains those that have been processed or about to be processed in Flow,
          // but offset not saved yet => ok to handle as duplicate
          FutureFalse
        } else if (recordWithOffset.envelopeLoaded) {
          logUnexpected()
          FutureFalse
        } else {
          logUnexpected()
          // This will result in projection restart (with normal configuration)
          Future.failed(
            new IllegalStateException(
              s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr] " +
              "due to unexpected sequence number. " +
              "Please report this issue at https://github.com/apache/incubator-pekko-persistence-r2dbc"))
        }
      } else if (seqNr == 1) {
        // always accept first event if no other event for that pid has been seen
        FutureTrue
      } else {
        // Haven't see seen this pid within the time window. Since events can be missed
        // when read at the tail we will only accept it if the event with previous seqNr has timestamp
        // before the time window of the offset store.
        // Backtracking will emit missed event again.
        timestampOf(pid, seqNr - 1).map {
          case Some(previousTimestamp) =>
            val before = currentState.latestTimestamp.minus(settings.timeWindow)
            if (previousTimestamp.isBefore(before)) {
              logger.debug(
                "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
                "is before time window [{}].",
                pid,
                seqNr: java.lang.Long,
                previousTimestamp,
                before)
              true
            } else if (recordWithOffset.envelopeLoaded) {
              logUnknown()
              false
            } else {
              logUnknown()
              // This will result in projection restart (with normal configuration)
              throw new IllegalStateException(
                s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr], " +
                "due to unknown sequence number. " +
                "Please report this issue at https://github.com/apache/incubator-pekko-persistence-r2dbc")
            }
          case None =>
            // previous not found, could have been deleted
            true
        }
      }
    } else {
      // strictSeqNr == false is for durable state where each revision might not be visible
      val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
      val ok = seqNr > prevSeqNr

      if (ok) {
        FutureTrue
      } else {
        logger.trace("Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", seqNr: java.lang.Long,
          pid,
          prevSeqNr: java.lang.Long)
        FutureFalse
      }
    }
  }