private def writeTagScanning()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [378:425]


  private def writeTagScanning(): Unit = {

    val updates: Seq[(PersistenceId, SequenceNr)] = toBeWrittenScanning.toVector
    // current pendingScanning will be written on next tick, if no write failures
    toBeWrittenScanning = pendingScanning
    // collect new until next tick
    pendingScanning = Map.empty

    if (updates.nonEmpty) {

      if (log.isDebugEnabled) {
        val maxPrint = 20
        log.debug(
          "Update tag scanning [{}]",
          if (updates.size <= maxPrint) updates.take(maxPrint).mkString(",")
          else
            updates.take(maxPrint).mkString(",") + s" ...and ${updates.size - 20} more")
      }

      tagWriterSession.taggedPreparedStatements.WriteTagScanning.futureResult().foreach { ps =>
        val startTime = System.nanoTime()

        def writeTagScanningBatch(group: Seq[(String, Long)]): Future[Done] = {
          val statements: Seq[BoundStatement] = group.map {
            case (pid, seqNr) => ps.bind(pid, seqNr: JLong)
          }
          Future.traverse(statements)(tagWriterSession.executeWrite).map(_ => Done)
        }

        // Execute 10 async statements at a time to not introduce too much load see issue #408.
        val batchIterator: Iterator[Seq[(PersistenceId, SequenceNr)]] = updates.grouped(10)

        var result = Future.successful[Done](Done)
        for (item <- batchIterator) {
          result = result.flatMap { _ =>
            writeTagScanningBatch(item)
          }
        }

        result.onComplete { result =>
          self ! WriteTagScanningCompleted(result, startTime, updates.size)
          result.failed.foreach(self ! TagWriteFailed(_))
        }
      }
    } else {
      scheduleWriteTagScanningTick()
    }
  }