in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [91:131]
def add(records: immutable.IndexedSeq[Record]): State = {
records.foldLeft(this) { case (acc, r) =>
val newByPid =
acc.byPid.get(r.pid) match {
case Some(existingRecord) =>
if (r.seqNr > existingRecord.seqNr)
acc.byPid.updated(r.pid, r)
else
acc.byPid // older or same seqNr
case None =>
acc.byPid.updated(r.pid, r)
}
val latestTimestamp = acc.latestTimestamp
val newLatest =
if (r.timestamp.isAfter(latestTimestamp)) {
Vector(r)
} else if (r.timestamp == latestTimestamp) {
acc.latest.find(_.pid == r.pid) match {
case None => acc.latest :+ r
case Some(existingRecord) =>
// keep highest seqNr
if (r.seqNr >= existingRecord.seqNr)
acc.latest.filterNot(_.pid == r.pid) :+ r
else
acc.latest
}
} else {
acc.latest // older than existing latest, keep existing latest
}
val newOldestTimestamp =
if (acc.oldestTimestamp == Instant.EPOCH)
r.timestamp // first record
else if (r.timestamp.isBefore(acc.oldestTimestamp))
r.timestamp
else
acc.oldestTimestamp // this is the normal case
acc.copy(byPid = newByPid, latest = newLatest, oldestTimestamp = newOldestTimestamp)
}
}