in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [237:313]
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
// we need to preserve the order / size of this sequence even though we don't map
// AtomicWrites 1:1 with a C* insert
//
// We must NOT catch serialization exceptions here because rejections will cause
// holes in the sequence number series and we use the sequence numbers to detect
// missing (delayed) events in the eventByTag query.
//
// Note that we assume that all messages have the same persistenceId, which is
// the case for Akka 2.4.2.
def serialize(aw: Seq[(PersistentRepr, UUID)]): Future[SerializedAtomicWrite] = {
val serializedEventsFut: Future[Seq[Serialized]] =
Future.sequence(aw.map {
case (pr, uuid) =>
val (pr2, tags) = pr.payload match {
case Tagged(payload, ts) =>
(pr.withPayload(payload), ts)
case _ =>
(pr, Set.empty[String])
}
serializeEvent(pr2, tags, uuid, settings.eventsByTagSettings.bucketSize, serialization, context.system)
})
serializedEventsFut.map { serializedEvents =>
SerializedAtomicWrite(aw.head._1.persistenceId, serializedEvents)
}
}
val writesWithUuids: Seq[Seq[(PersistentRepr, UUID)]] =
messages.map(aw => aw.payload.map(pr => (pr, generateUUID(pr))))
val writeInProgressForPersistentId = Promise[Done]()
val pid = messages.head.persistenceId
writeInProgress.put(pid, writeInProgressForPersistentId.future)
val toReturn: Future[Nil.type] = Future.sequence(writesWithUuids.map(w => serialize(w))).flatMap {
(serialized: Seq[SerializedAtomicWrite]) =>
val result: Future[Any] =
if (messages.map(_.payload.size).sum <= journalSettings.maxMessageBatchSize) {
// optimize for the common case
writeMessages(serialized)
} else {
// if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
val groups: List[List[SerializedAtomicWrite]] = groupedWrites(serialized.toList.reverse, Nil, Nil)
// execute the groups in sequence
def rec(todo: List[List[SerializedAtomicWrite]]): Future[Any] =
todo match {
case write :: remainder =>
writeMessages(write).flatMap(_ => rec(remainder))
case Nil => FutureUnit
}
rec(groups)
}
// The tag writer keeps retrying but will drop writes for a persistent actor when it restarts
// due to this failing
result.flatMap { _ =>
tagWrites match {
case Some(t) =>
implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic)
case None => Future.successful(Nil)
}
}
}
// if the write fails still need to remove state from the map
toReturn.onComplete { _ =>
sendWriteFinished(pid, writeInProgressForPersistentId)
}
toReturn
}