in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [529:547]
@tailrec private def cleanupInflight(newState: State): Unit = {
val currentInflight = getInflight()
val newInflight =
currentInflight.filter {
case (inflightPid, inflightSeqNr) =>
newState.byPid.get(inflightPid) match {
case Some(r) => r.seqNr < inflightSeqNr
case None => true
}
case _ => true
}
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/apache/incubator-pekko-persistence-r2dbc")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
}