in core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala [181:216]
def store(serializedRow: SerializedSnapshotRow): Future[Unit] = {
val entityType = PersistenceId.extractEntityType(serializedRow.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(serializedRow.persistenceId)
r2dbcExecutor
.updateOne(s"upsert snapshot [${serializedRow.persistenceId}], sequence number [${serializedRow.seqNr}]") {
connection =>
val statement =
connection
.createStatement(upsertSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, serializedRow.persistenceId)
.bind(3, serializedRow.seqNr)
.bind(4, serializedRow.writeTimestamp)
.bind(5, serializedRow.snapshot)
.bind(6, serializedRow.serializerId)
.bind(7, serializedRow.serializerManifest)
serializedRow.metadata match {
case Some(SerializedSnapshotMetadata(serializedMeta, serializerId, serializerManifest)) =>
statement
.bind(8, serializedMeta)
.bind(9, serializerId)
.bind(10, serializerManifest)
case None =>
statement
.bindNull(8, classOf[Array[Byte]])
.bindNull(9, classOf[Integer])
.bindNull(10, classOf[String])
}
statement
}
.map(_ => ())(ExecutionContexts.parasitic)
}