in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [509:549]
private def insertTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Long] = {
def bindRecord(stmt: Statement, record: Record): Statement = {
val slice = persistenceExt.sliceForPersistenceId(record.pid)
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
if (slice < minSlice || slice > maxSlice)
throw new IllegalArgumentException(
s"This offset store [$projectionId] manages slices " +
"[$minSlice - $maxSlice] but received slice [$slice] for persistenceId [${record.pid}]")
stmt
.bind(0, projectionId.name)
.bind(1, projectionId.key)
.bind(2, slice)
.bind(3, record.pid)
.bind(4, record.seqNr)
.bind(5, record.timestamp)
}
require(records.nonEmpty)
// FIXME change to trace
logger.debug("saving timestamp offset [{}], {}", records.last.timestamp: Any, records: Any)
val statement = conn.createStatement(insertTimestampOffsetSql)
if (records.size == 1) {
val boundStatement = bindRecord(statement, records.head)
R2dbcExecutor.updateOneInTx(boundStatement)
} else {
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
val boundStatement =
records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
if (idx != 0) {
stmt.add()
}
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
}
}