in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [106:138]
private def serialize(batches: Map[String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]]])(
op: (String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]]) => Future[Done]): Future[Done] = {
val logProgressEvery: Int = 5
val size = batches.size
logger.debug("Processing [{}] partitioned batches serially", size)
def loop(
remaining: List[(String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]])],
n: Int): Future[Done] = {
remaining match {
case Nil => Future.successful(Done)
case (key, batch) :: tail =>
op(key, batch).flatMap { _ =>
if (n % logProgressEvery == 0)
logger.debug("Processed batches [{}] of [{}]", n, size)
loop(tail, n + 1)
}
}
}
val result = loop(batches.toList, n = 1)
result.onComplete {
case Success(_) =>
logger.debug("Processing completed of [{}] batches", size)
case Failure(e) =>
logger.error(e, "Processing of batches failed")
}
result
}