def isDuplicate()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [610:635]


  def isDuplicate(record: Record): Boolean =
    getState().isDuplicate(record)

  def filterAccepted[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[Envelope]] = {
    envelopes
      .foldLeft(Future.successful(getInflight(), Vector.empty[Envelope])) { (acc, envelope) =>
        acc.flatMap { case (inflight, filteredEnvelopes) =>
          createRecordWithOffset(envelope) match {
            case Some(recordWithOffset) =>
              isAccepted(recordWithOffset, inflight).map {
                case true =>
                  (
                    inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr),
                    filteredEnvelopes :+ envelope)
                case false =>
                  (inflight, filteredEnvelopes)
              }
            case None =>
              Future.successful((inflight, filteredEnvelopes :+ envelope))
          }
        }
      }
      .map { case (_, filteredEnvelopes) =>
        filteredEnvelopes
      }
  }