in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [378:425]
private def writeTagScanning(): Unit = {
val updates: Seq[(PersistenceId, SequenceNr)] = toBeWrittenScanning.toVector
// current pendingScanning will be written on next tick, if no write failures
toBeWrittenScanning = pendingScanning
// collect new until next tick
pendingScanning = Map.empty
if (updates.nonEmpty) {
if (log.isDebugEnabled) {
val maxPrint = 20
log.debug(
"Update tag scanning [{}]",
if (updates.size <= maxPrint) updates.take(maxPrint).mkString(",")
else
updates.take(maxPrint).mkString(",") + s" ...and ${updates.size - 20} more")
}
tagWriterSession.taggedPreparedStatements.WriteTagScanning.foreach { ps =>
val startTime = System.nanoTime()
def writeTagScanningBatch(group: Seq[(String, Long)]): Future[Done] = {
val statements: Seq[BoundStatement] = group.map {
case (pid, seqNr) => ps.bind(pid, seqNr: JLong)
}
Future.traverse(statements)(tagWriterSession.executeWrite).map(_ => Done)
}
// Execute 10 async statements at a time to not introduce too much load see issue #408.
val batchIterator: Iterator[Seq[(PersistenceId, SequenceNr)]] = updates.grouped(10)
var result = Future.successful[Done](Done)
for (item <- batchIterator) {
result = result.flatMap { _ =>
writeTagScanningBatch(item)
}
}
result.onComplete { result =>
self ! WriteTagScanningCompleted(result, startTime, updates.size)
result.failed.foreach(self ! TagWriteFailed(_))
}
}
} else {
scheduleWriteTagScanningTick()
}
}