def managementSetOffset[Offset]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [850:874]


  def managementSetOffset[Offset](offset: Offset): Future[Done] = {
    offset match {
      case t: TimestampOffset =>
        r2dbcExecutor
          .withConnection("set offset") { conn =>
            deleteNewTimestampOffsetsInTx(conn, t.timestamp).flatMap { _ =>
              val records =
                if (t.seen.isEmpty)
                  // we need some persistenceId to be able to store the new offset timestamp
                  Vector(Record(PersistenceId("mgmt", UUID.randomUUID().toString).id, seqNr = 1L, t.timestamp))
                else
                  t.seen.iterator.map { case (pid, seqNr) => Record(pid, seqNr, t.timestamp) }.toVector
              insertTimestampOffsetInTx(conn, records)
            }
          }
          .map(_ => Done)(ExecutionContexts.parasitic)

      case _ =>
        r2dbcExecutor
          .withConnection("set offset") { conn =>
            savePrimitiveOffsetInTx(conn, offset)
          }
          .map(_ => Done)(ExecutionContexts.parasitic)
    }
  }