private def insertTimestampOffsetInTx()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [509:549]


  private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Long] = {
    def bindRecord(stmt: Statement, record: Record): Statement = {
      val slice = persistenceExt.sliceForPersistenceId(record.pid)
      val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
      val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
      if (slice < minSlice || slice > maxSlice)
        throw new IllegalArgumentException(
          s"This offset store [$projectionId] manages slices " +
          "[$minSlice - $maxSlice] but received slice [$slice] for persistenceId [${record.pid}]")

      stmt
        .bind(0, projectionId.name)
        .bind(1, projectionId.key)
        .bind(2, slice)
        .bind(3, record.pid)
        .bind(4, record.seqNr)
        .bind(5, record.timestamp)
    }

    require(records.nonEmpty)

    // FIXME change to trace
    logger.debug("saving timestamp offset [{}], {}", records.last.timestamp: Any, records: Any)

    val statement = conn.createStatement(insertTimestampOffsetSql)

    if (records.size == 1) {
      val boundStatement = bindRecord(statement, records.head)
      R2dbcExecutor.updateOneInTx(boundStatement)
    } else {
      // TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
      val boundStatement =
        records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
          if (idx != 0) {
            stmt.add()
          }
          bindRecord(stmt, rec)
        }
      R2dbcExecutor.updateBatchInTx(boundStatement)
    }
  }