in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [330:372]
private def readTimestampOffset(): Future[Option[TimestampOffset]] = {
idle.set(false)
val oldState = state.get()
val (minSlice, maxSlice) = {
sourceProvider match {
case Some(provider) => (provider.minSlice, provider.maxSlice)
case None => (0, persistenceExt.numberOfSlices - 1)
}
}
val recordsFut = r2dbcExecutor.select("read timestamp offset")(
conn => {
logger.trace("reading timestamp offset for [{}]", projectionId)
conn
.createStatement(selectTimestampOffsetSql)
.bind(0, minSlice)
.bind(1, maxSlice)
.bind(2, projectionId.name)
},
row => {
val pid = row.get("persistence_id", classOf[String])
val seqNr = row.get("seq_nr", classOf[java.lang.Long])
val timestamp = row.get("timestamp_offset", classOf[Instant])
Record(pid, seqNr, timestamp)
})
recordsFut.map { records =>
val newState = State(records)
logger.debug(
"readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}]",
newState.byPid.size: java.lang.Integer,
newState.oldestTimestamp,
newState.latestTimestamp)
if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
clearInflight()
if (newState == State.empty) {
None
} else {
newState.latestOffset
}
}
}