in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [446:458]
def saveOffsetsInTx[Offset](conn: Connection, offsets: immutable.IndexedSeq[Offset]): Future[Done] = {
if (offsets.exists(_.isInstanceOf[TimestampOffset])) {
val records = offsets.flatMap {
case t: TimestampOffset =>
t.seen.map { case (pid, seqNr) => Record(pid, seqNr, t.timestamp) }
case _ =>
Nil
}
saveTimestampOffsetInTx(conn, records)
} else {
savePrimitiveOffsetInTx(conn, offsets.last)
}
}