in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [610:635]
def isDuplicate(record: Record): Boolean =
getState().isDuplicate(record)
def filterAccepted[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[Envelope]] = {
envelopes
.foldLeft(Future.successful(getInflight(), Vector.empty[Envelope])) { (acc, envelope) =>
acc.flatMap { case (inflight, filteredEnvelopes) =>
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
isAccepted(recordWithOffset, inflight).map {
case true =>
(
inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr),
filteredEnvelopes :+ envelope)
case false =>
(inflight, filteredEnvelopes)
}
case None =>
Future.successful((inflight, filteredEnvelopes :+ envelope))
}
}
}
.map { case (_, filteredEnvelopes) =>
filteredEnvelopes
}
}