def execute()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/reconciler/DeleteTagViewForPersistenceId.scala [48:72]


  def execute(): Future[Done] = {
    queries
      .currentEventsByTagInternal(tag, NoOffset)
      .filter(persistenceIds contains _.persistentRepr.persistenceId)
      // Make the parallelism configurable?
      .mapAsync(1) { uuidPr =>
        val bucket = TimeBucket(uuidPr.offset, settings.eventsByTagSettings.bucketSize)
        val timestamp = uuidPr.offset
        val persistenceId = uuidPr.persistentRepr.persistenceId
        val tagPidSequenceNr = uuidPr.tagPidSequenceNr
        log.debug("Issuing delete {} {} {} {}", persistenceId, bucket, timestamp, tagPidSequenceNr)
        session.deleteFromTagView(tag, bucket, timestamp, persistenceId, tagPidSequenceNr)
      }
      .runWith(Sink.ignore)
      .flatMap(_ =>
        Future.traverse(persistenceIds) { pid =>
          val progress = session.deleteTagProgress(tag, pid)
          val scanning = session.deleteTagScannning(pid)
          for {
            _ <- progress
            _ <- scanning
          } yield Done
        })
      .map(_ => Done)
  }