def sendMissingTagWrite()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala [93:133]


  def sendMissingTagWrite(tagProgress: Map[Tag, TagProgress])(tpr: TaggedPersistentRepr): Future[TaggedPersistentRepr] =
    if (tpr.tags.isEmpty) Future.successful(tpr)
    else {
      val completed: List[Future[Done]] =
        tpr.tags.toList
          .map(tag =>
            tag -> serializeEvent(
              tpr.pr,
              tpr.tags,
              tpr.offset,
              settings.eventsByTagSettings.bucketSize,
              serialization,
              system))
          .map {
            case (tag, serializedFut) =>
              serializedFut.map { serialized =>
                tagProgress.get(tag) match {
                  case None =>
                    log.debug(
                      "[{}] Tag write not in progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]",
                      tpr.pr.persistenceId,
                      tag,
                      tpr.sequenceNr)
                    tagWriters ! TagWrite(tag, serialized :: Nil)
                    Done
                  case Some(progress) =>
                    if (tpr.sequenceNr > progress.sequenceNr) {
                      log.debug(
                        "[{}] Sequence nr > than write progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]",
                        tpr.pr.persistenceId,
                        tag,
                        tpr.sequenceNr)
                      tagWriters ! TagWrite(tag, serialized :: Nil)
                    }
                    Done
                }
              }
          }

      Future.sequence(completed).map(_ => tpr)
    }