in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala [189:224]
@InternalApi private[pekko] def deduplicate[Event](
capacity: Int): Flow[EventEnvelope[Event], EventEnvelope[Event], NotUsed] = {
if (capacity == 0)
Flow[EventEnvelope[Event]]
else {
val evictThreshold = (capacity * 1.1).toInt
Flow[EventEnvelope[Event]]
.statefulMapConcat(() => {
// cache of seen pid/seqNr
var seen = mutable.LinkedHashSet.empty[(String, Long)]
env => {
if (env.eventOption.isEmpty) {
// don't deduplicate from backtracking
env :: Nil
} else {
val entry = env.persistenceId -> env.sequenceNr
val result = {
if (seen.contains(entry)) {
Nil
} else {
seen.add(entry)
env :: Nil
}
}
if (seen.size >= evictThreshold) {
// weird that add modifies the instance but drop returns a new instance
seen = seen.drop(seen.size - capacity)
}
result
}
}
})
}
}