in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala [214:291]
def writeState(state: SerializedStateRow): Future[Done] = {
require(state.revision > 0)
val entityType = PersistenceId.extractEntityType(state.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
def bindTags(stmt: Statement, i: Int): Statement = {
if (state.tags.isEmpty)
stmt.bindNull(i, classOf[Array[String]])
else
stmt.bind(i, state.tags.toArray)
}
val result = {
if (state.revision == 1) {
r2dbcExecutor
.updateOne(s"insert [${state.persistenceId}]") { connection =>
val stmt = connection
.createStatement(insertStateSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, state.persistenceId)
.bind(3, state.revision)
.bind(4, state.serId)
.bind(5, state.serManifest)
.bind(6, state.payload)
bindTags(stmt, 7)
}
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
Future.failed(
new IllegalStateException(
s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists"))
}
} else {
val previousRevision = state.revision - 1
r2dbcExecutor.updateOne(s"update [${state.persistenceId}]") { connection =>
val stmt = connection
.createStatement(updateStateSql)
.bind(0, state.revision)
.bind(1, state.serId)
.bind(2, state.serManifest)
.bind(3, state.payload)
bindTags(stmt, 4)
if (settings.dbTimestampMonotonicIncreasing) {
if (settings.durableStateAssertSingleWriter)
stmt
.bind(5, state.persistenceId)
.bind(6, previousRevision)
else
stmt
.bind(5, state.persistenceId)
} else {
stmt
.bind(5, state.persistenceId)
.bind(6, previousRevision)
.bind(7, state.persistenceId)
if (settings.durableStateAssertSingleWriter)
stmt.bind(8, previousRevision)
else
stmt
}
}
}
}
result.map { updatedRows =>
if (updatedRows != 1)
throw new IllegalStateException(
s"Update failed: durable state for persistence id [${state.persistenceId}] could not be updated to revision [${state.revision}]")
else {
log.debug("Updated durable state for persistenceId [{}] to revision [{}]", state.persistenceId, state.revision)
Done
}
}
}