override def asyncWriteMessages()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [233:309]


  override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
    // we need to preserve the order / size of this sequence even though we don't map
    // AtomicWrites 1:1 with a C* insert
    //
    // We must NOT catch serialization exceptions here because rejections will cause
    // holes in the sequence number series and we use the sequence numbers to detect
    // missing (delayed) events in the eventByTag query.
    //
    // Note that we assume that all messages have the same persistenceId, which is
    // the case for Akka 2.4.2.
    def serialize(aw: Seq[(PersistentRepr, UUID)]): Future[SerializedAtomicWrite] = {
      val serializedEventsFut: Future[Seq[Serialized]] =
        Future.sequence(aw.map {
          case (pr, uuid) =>
            val (pr2, tags) = pr.payload match {
              case Tagged(payload, ts) =>
                (pr.withPayload(payload), ts)
              case _ =>
                (pr, Set.empty[String])
            }
            serializeEvent(pr2, tags, uuid, settings.eventsByTagSettings.bucketSize, serialization, context.system)
        })

      serializedEventsFut.map { serializedEvents =>
        SerializedAtomicWrite(aw.head._1.persistenceId, serializedEvents)
      }
    }

    val writesWithUuids: Seq[Seq[(PersistentRepr, UUID)]] =
      messages.map(aw => aw.payload.map(pr => (pr, generateUUID(pr))))

    val writeInProgressForPersistentId = Promise[Done]()
    val pid = messages.head.persistenceId
    writeInProgress.put(pid, writeInProgressForPersistentId.future)

    val toReturn: Future[Nil.type] = Future.sequence(writesWithUuids.map(w => serialize(w))).flatMap {
      serialized: Seq[SerializedAtomicWrite] =>
        val result: Future[Any] =
          if (messages.map(_.payload.size).sum <= journalSettings.maxMessageBatchSize) {
            // optimize for the common case
            writeMessages(serialized)
          } else {

            // if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
            val groups: List[List[SerializedAtomicWrite]] = groupedWrites(serialized.toList.reverse, Nil, Nil)

            // execute the groups in sequence
            def rec(todo: List[List[SerializedAtomicWrite]]): Future[Any] =
              todo match {
                case write :: remainder =>
                  writeMessages(write).flatMap(_ => rec(remainder))
                case Nil => FutureUnit
              }

            rec(groups)
          }

        // The tag writer keeps retrying but will drop writes for a persistent actor when it restarts
        // due to this failing
        result.flatMap { _ =>
          tagWrites match {
            case Some(t) =>
              implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout)
              t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic)
            case None => Future.successful(Nil)
          }
        }

    }

    // if the write fails still need to remove state from the map
    toReturn.onComplete { _ =>
      sendWriteFinished(pid, writeInProgressForPersistentId)
    }

    toReturn
  }