in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [440:487]
private def saveTimestampOffsetInTx[Offset](conn: Connection, records: immutable.IndexedSeq[Record]): Future[Done] = {
idle.set(false)
val oldState = state.get()
val filteredRecords = {
if (records.size <= 1)
records.filterNot(oldState.isDuplicate)
else {
// use last record for each pid
records
.groupBy(_.pid)
.valuesIterator
.collect {
case recordsByPid if !oldState.isDuplicate(recordsByPid.last) => recordsByPid.last
}
.toVector
}
}
if (filteredRecords.isEmpty) {
FutureDone
} else {
val newState = oldState.add(filteredRecords)
// accumulate some more than the timeWindow before evicting
val evictedNewState =
if (newState.size > evictKeepNumberOfEntriesThreshold && newState.window.compareTo(evictWindow) > 0) {
val evictUntil = newState.latestTimestamp.minus(settings.timeWindow)
val s = newState.evict(evictUntil, settings.keepNumberOfEntries)
logger.debug(
"Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].",
newState.size - s.size: java.lang.Integer,
evictUntil,
s.size: java.lang.Integer,
newState.latestTimestamp)
s
} else
newState
val offsetInserts = insertTimestampOffsetInTx(conn, filteredRecords)
offsetInserts.map { _ =>
if (state.compareAndSet(oldState, evictedNewState))
cleanupInflight(evictedNewState)
else
throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.")
Done
}
}
}