in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala [85:112]
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = {
require(revision > 0)
val row =
PekkoSerialization.serialize(serialization, value).map { serialized =>
DurableStateTables.DurableStateRow(
0, // insert 0 for autoinc columns
persistenceId,
revision,
serialized.payload,
Option(tag).filter(_.trim.nonEmpty),
serialized.serId,
Option(serialized.serManifest).filter(_.trim.nonEmpty),
System.currentTimeMillis)
}
Future
.fromTry(row)
.flatMap { r =>
val action = if (revision == 1) insertDurableState(r) else updateDurableState(r)
db.run(action)
}
.map { rowsAffected =>
if (rowsAffected == 0)
throw new IllegalStateException(
s"Incorrect revision number [$revision] provided: It has to be 1 more than the value existing in the database for persistenceId [$persistenceId]")
else Done
}
}