in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [850:874]
def managementSetOffset[Offset](offset: Offset): Future[Done] = {
offset match {
case t: TimestampOffset =>
r2dbcExecutor
.withConnection("set offset") { conn =>
deleteNewTimestampOffsetsInTx(conn, t.timestamp).flatMap { _ =>
val records =
if (t.seen.isEmpty)
// we need some persistenceId to be able to store the new offset timestamp
Vector(Record(PersistenceId("mgmt", UUID.randomUUID().toString).id, seqNr = 1L, t.timestamp))
else
t.seen.iterator.map { case (pid, seqNr) => Record(pid, seqNr, t.timestamp) }.toVector
insertTimestampOffsetInTx(conn, records)
}
}
.map(_ => Done)(ExecutionContexts.parasitic)
case _ =>
r2dbcExecutor
.withConnection("set offset") { conn =>
savePrimitiveOffsetInTx(conn, offset)
}
.map(_ => Done)(ExecutionContexts.parasitic)
}
}