in core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala [116:190]
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
val timestamp = if (journalSettings.useAppTimestamp) Instant.now() else JournalDao.EmptyDbTimestamp
val serialized: Try[Seq[SerializedJournalRow]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
case Tagged(payload, tags) =>
// eventsBytag not implemented, issue #82, but they are stored
logEventsByTagsNotImplemented()
(payload.asInstanceOf[AnyRef], tags)
case other =>
(other.asInstanceOf[AnyRef], Set.empty[String])
}
val entityType = PersistenceId.extractEntityType(pr.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId)
val serialized = serialization.serialize(event).get
val serializer = serialization.findSerializerFor(event)
val manifest = Serializers.manifestFor(serializer, event)
val id: Int = serializer.identifier
val metadata = pr.metadata.map { meta =>
val m = meta.asInstanceOf[AnyRef]
val serializedMeta = serialization.serialize(m).get
val metaSerializer = serialization.findSerializerFor(m)
val metaManifest = Serializers.manifestFor(metaSerializer, m)
val id: Int = metaSerializer.identifier
SerializedEventMetadata(id, metaManifest, serializedMeta)
}
SerializedJournalRow(
slice,
entityType,
pr.persistenceId,
pr.sequenceNr,
timestamp,
JournalDao.EmptyDbTimestamp,
Some(serialized),
id,
manifest,
pr.writerUuid,
tags,
metadata)
}
}
serialized match {
case Success(writes) =>
journalDao.writeEvents(writes)
case Failure(exc) =>
Future.failed(exc)
}
}
val persistenceId = messages.head.persistenceId
val writeResult: Future[Instant] =
if (messages.size == 1)
atomicWrite(messages.head)
else {
// persistAsync case
// easiest to just group all into a single AtomicWrite
val batch = AtomicWrite(messages.flatMap(_.payload))
atomicWrite(batch)
}
val writeAndPublishResult: Future[Done] =
publish(messages, writeResult)
writesInProgress.put(persistenceId, writeAndPublishResult)
writeAndPublishResult.onComplete { _ =>
self ! WriteFinished(persistenceId, writeAndPublishResult)
}
writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
}