override def asyncWriteMessages()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala [80:124]


  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

    def serializeAtomicWrite(aw: AtomicWrite): Try[Seq[(JournalPekkoSerializationRow, Set[String])]] = {
      Try(aw.payload.map(serialize))
    }

    def serialize(pr: PersistentRepr): (JournalPekkoSerializationRow, Set[String]) = {

      val (updatedPr, tags) = pr.payload match {
        case Tagged(payload, tags) => (pr.withPayload(payload), tags)
        case _                     => (pr, Set.empty[String])
      }

      val serializedPayload = PekkoSerialization.serialize(serialization, updatedPr.payload).get
      val serializedMetadata = updatedPr.metadata.flatMap(m => PekkoSerialization.serialize(serialization, m).toOption)
      val row = JournalPekkoSerializationRow(
        Long.MinValue,
        updatedPr.deleted,
        updatedPr.persistenceId,
        updatedPr.sequenceNr,
        updatedPr.writerUuid,
        updatedPr.timestamp,
        updatedPr.manifest,
        serializedPayload.payload,
        serializedPayload.serId,
        serializedPayload.serManifest,
        serializedMetadata.map(_.payload),
        serializedMetadata.map(_.serId),
        serializedMetadata.map(_.serManifest))

      (row, tags)
    }

    val serializedTries = messages.map(serializeAtomicWrite)

    val rowsToWrite: Seq[(JournalPekkoSerializationRow, Set[String])] = for {
      serializeTry <- serializedTries
      row <- serializeTry.getOrElse(Seq.empty)
    } yield row

    def resultWhenWriteComplete =
      if (serializedTries.forall(_.isSuccess)) Nil else serializedTries.map(_.map(_ => ()))

    queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete)
  }