in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala [77:124]
private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def serializeAtomicWrite(aw: AtomicWrite): Try[Seq[(JournalPekkoSerializationRow, Set[String])]] = {
Try(aw.payload.map(serialize))
}
def serialize(pr: PersistentRepr): (JournalPekkoSerializationRow, Set[String]) = {
val (updatedPr, tags) = pr.payload match {
case Tagged(payload, tags) => (pr.withPayload(payload), tags)
case _ => (pr, Set.empty[String])
}
val serializedPayload = PekkoSerialization.serialize(serialization, updatedPr.payload).get
val serializedMetadata = updatedPr.metadata.flatMap(m => PekkoSerialization.serialize(serialization, m).toOption)
val row = JournalPekkoSerializationRow(
Long.MinValue,
updatedPr.deleted,
updatedPr.persistenceId,
updatedPr.sequenceNr,
updatedPr.writerUuid,
updatedPr.timestamp,
updatedPr.manifest,
serializedPayload.payload,
serializedPayload.serId,
serializedPayload.serManifest,
serializedMetadata.map(_.payload),
serializedMetadata.map(_.serId),
serializedMetadata.map(_.serManifest))
(row, tags)
}
val serializedTries = messages.map(serializeAtomicWrite)
val rowsToWrite: Seq[(JournalPekkoSerializationRow, Set[String])] = for {
serializeTry <- serializedTries
row <- serializeTry.getOrElse(Seq.empty)
} yield row
def resultWhenWriteComplete =
if (serializedTries.forall(_.isSuccess)) Nil else serializedTries.map(_.map(_ => ()))
queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete)
}