override def asyncWriteMessages()

in core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala [116:190]


  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
      val timestamp = if (journalSettings.useAppTimestamp) Instant.now() else JournalDao.EmptyDbTimestamp
      val serialized: Try[Seq[SerializedJournalRow]] = Try {
        atomicWrite.payload.map { pr =>
          val (event, tags) = pr.payload match {
            case Tagged(payload, tags) =>
              // eventsBytag not implemented, issue #82, but they are stored
              logEventsByTagsNotImplemented()
              (payload.asInstanceOf[AnyRef], tags)
            case other =>
              (other.asInstanceOf[AnyRef], Set.empty[String])
          }

          val entityType = PersistenceId.extractEntityType(pr.persistenceId)
          val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId)

          val serialized = serialization.serialize(event).get
          val serializer = serialization.findSerializerFor(event)
          val manifest = Serializers.manifestFor(serializer, event)
          val id: Int = serializer.identifier

          val metadata = pr.metadata.map { meta =>
            val m = meta.asInstanceOf[AnyRef]
            val serializedMeta = serialization.serialize(m).get
            val metaSerializer = serialization.findSerializerFor(m)
            val metaManifest = Serializers.manifestFor(metaSerializer, m)
            val id: Int = metaSerializer.identifier
            SerializedEventMetadata(id, metaManifest, serializedMeta)
          }

          SerializedJournalRow(
            slice,
            entityType,
            pr.persistenceId,
            pr.sequenceNr,
            timestamp,
            JournalDao.EmptyDbTimestamp,
            Some(serialized),
            id,
            manifest,
            pr.writerUuid,
            tags,
            metadata)
        }
      }

      serialized match {
        case Success(writes) =>
          journalDao.writeEvents(writes)
        case Failure(exc) =>
          Future.failed(exc)
      }
    }

    val persistenceId = messages.head.persistenceId
    val writeResult: Future[Instant] =
      if (messages.size == 1)
        atomicWrite(messages.head)
      else {
        // persistAsync case
        // easiest to just group all into a single AtomicWrite
        val batch = AtomicWrite(messages.flatMap(_.payload))
        atomicWrite(batch)
      }

    val writeAndPublishResult: Future[Done] =
      publish(messages, writeResult)

    writesInProgress.put(persistenceId, writeAndPublishResult)
    writeAndPublishResult.onComplete { _ =>
      self ! WriteFinished(persistenceId, writeAndPublishResult)
    }
    writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
  }