in core/src/main/scala/org/apache/pekko/persistence/cassandra/reconciler/DeleteTagViewForPersistenceId.scala [48:72]
def execute(): Future[Done] = {
queries
.currentEventsByTagInternal(tag, NoOffset)
.filter(persistenceIds contains _.persistentRepr.persistenceId)
// Make the parallelism configurable?
.mapAsync(1) { uuidPr =>
val bucket = TimeBucket(uuidPr.offset, settings.eventsByTagSettings.bucketSize)
val timestamp = uuidPr.offset
val persistenceId = uuidPr.persistentRepr.persistenceId
val tagPidSequenceNr = uuidPr.tagPidSequenceNr
log.debug("Issuing delete {} {} {} {}", persistenceId, bucket, timestamp, tagPidSequenceNr)
session.deleteFromTagView(tag, bucket, timestamp, persistenceId, tagPidSequenceNr)
}
.runWith(Sink.ignore)
.flatMap(_ =>
Future.traverse(persistenceIds) { pid =>
val progress = session.deleteTagProgress(tag, pid)
val scanning = session.deleteTagScannning(pid)
for {
_ <- progress
_ <- scanning
} yield Done
})
.map(_ => Done)
}