in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala [93:133]
def sendMissingTagWrite(tagProgress: Map[Tag, TagProgress])(tpr: TaggedPersistentRepr): Future[TaggedPersistentRepr] =
if (tpr.tags.isEmpty) Future.successful(tpr)
else {
val completed: List[Future[Done]] =
tpr.tags.toList
.map(tag =>
tag -> serializeEvent(
tpr.pr,
tpr.tags,
tpr.offset,
settings.eventsByTagSettings.bucketSize,
serialization,
system))
.map {
case (tag, serializedFut) =>
serializedFut.map { serialized =>
tagProgress.get(tag) match {
case None =>
log.debug(
"[{}] Tag write not in progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]",
tpr.pr.persistenceId,
tag,
tpr.sequenceNr)
tagWriters ! TagWrite(tag, serialized :: Nil)
Done
case Some(progress) =>
if (tpr.sequenceNr > progress.sequenceNr) {
log.debug(
"[{}] Sequence nr > than write progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]",
tpr.pr.persistenceId,
tag,
tpr.sequenceNr)
tagWriters ! TagWrite(tag, serialized :: Nil)
}
Done
}
}
}
Future.sequence(completed).map(_ => tpr)
}