in core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala [166:264]
def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = {
require(events.nonEmpty)
// it's always the same persistenceId for all events
val persistenceId = events.head.persistenceId
val previousSeqNr = events.head.seqNr - 1
// The MigrationTool defines the dbTimestamp to preserve the original event timestamp
val useTimestampFromDb = events.head.dbTimestamp == Instant.EPOCH
def bind(stmt: Statement, write: SerializedJournalRow): Statement = {
stmt
.bind(0, write.slice)
.bind(1, write.entityType)
.bind(2, write.persistenceId)
.bind(3, write.seqNr)
.bind(4, write.writerUuid)
.bind(5, "") // FIXME event adapter
.bind(6, write.serId)
.bind(7, write.serManifest)
.bind(8, write.payload.get)
if (write.tags.isEmpty)
stmt.bindNull(9, classOf[Array[String]])
else
stmt.bind(9, write.tags.toArray)
// optional metadata
write.metadata match {
case Some(m) =>
stmt
.bind(10, m.serId)
.bind(11, m.serManifest)
.bind(12, m.payload)
case None =>
stmt
.bindNull(10, classOf[Integer])
.bindNull(11, classOf[String])
.bindNull(12, classOf[Array[Byte]])
}
if (useTimestampFromDb) {
if (!journalSettings.dbTimestampMonotonicIncreasing)
stmt
.bind(13, write.persistenceId)
.bind(14, previousSeqNr)
} else {
if (journalSettings.dbTimestampMonotonicIncreasing)
stmt
.bind(13, write.dbTimestamp)
else
stmt
.bind(13, write.dbTimestamp)
.bind(14, write.persistenceId)
.bind(15, previousSeqNr)
}
stmt
}
val insertSql =
if (useTimestampFromDb) insertEventWithTransactionTimestampSql
else insertEventWithParameterTimestampSql
val totalEvents = events.size
if (totalEvents == 1) {
val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")(
connection => bind(connection.createStatement(insertSql), events.head),
row => row.get(0, classOf[Instant]))
if (log.isDebugEnabled())
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId)
}
if (useTimestampFromDb) {
result
} else {
result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
}
} else {
val result = r2dbcExecutor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")(
connection =>
events.zipWithIndex.foldLeft(connection.createStatement(insertSql)) { case (stmt, (write, idx)) =>
if (idx != 0) {
stmt.add()
}
bind(stmt, write)
},
row => row.get(0, classOf[Instant]))
if (log.isDebugEnabled())
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId)
}
if (useTimestampFromDb) {
result.map(_.head)(ExecutionContexts.parasitic)
} else {
result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
}
}
}