in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [470:503]
private def deserializeAndAddOffset(timestampOffset: TimestampOffset): Flow[Row, Envelope, NotUsed] = {
Flow[Row].statefulMapConcat { () =>
var currentTimestamp = timestampOffset.timestamp
var currentSequenceNrs: Map[String, Long] = timestampOffset.seen
row => {
if (row.dbTimestamp == currentTimestamp) {
// has this already been seen?
if (currentSequenceNrs.get(row.persistenceId).exists(_ >= row.seqNr)) {
if (currentSequenceNrs.size >= settings.querySettings.bufferSize) {
throw new IllegalStateException(
s"Too many events stored with the same timestamp [$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]")
}
log.trace(
"filtering [{}] [{}] as db timestamp is the same as last offset and is in seen [{}]",
row.persistenceId,
row.seqNr: java.lang.Long,
currentSequenceNrs)
Nil
} else {
currentSequenceNrs = currentSequenceNrs.updated(row.persistenceId, row.seqNr)
val offset =
TimestampOffset(row.dbTimestamp, row.readDbTimestamp, currentSequenceNrs)
createEnvelope(offset, row) :: Nil
}
} else {
// ne timestamp, reset currentSequenceNrs
currentTimestamp = row.dbTimestamp
currentSequenceNrs = Map(row.persistenceId -> row.seqNr)
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, currentSequenceNrs)
createEnvelope(offset, row) :: Nil
}
}
}
}