in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala [72:85]
def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
val serializedTries: Seq[Try[Seq[JournalRow]]] = serializer.serialize(messages)
// If serialization fails for some AtomicWrites, the other AtomicWrites may still be written
val rowsToWrite: Seq[JournalRow] = for {
serializeTry <- serializedTries
row <- serializeTry.getOrElse(Seq.empty)
} yield row
def resultWhenWriteComplete =
if (serializedTries.forall(_.isSuccess)) Nil else serializedTries.map(_.map(_ => ()))
queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete)
}