private def serialize()

in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [106:138]


  private def serialize(batches: Map[String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]]])(
      op: (String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]]) => Future[Done]): Future[Done] = {

    val logProgressEvery: Int = 5
    val size = batches.size
    logger.debug("Processing [{}] partitioned batches serially", size)

    def loop(
        remaining: List[(String, immutable.Seq[ProjectionContextImpl[Offset, Envelope]])],
        n: Int): Future[Done] = {
      remaining match {
        case Nil => Future.successful(Done)
        case (key, batch) :: tail =>
          op(key, batch).flatMap { _ =>
            if (n % logProgressEvery == 0)
              logger.debug("Processed batches [{}] of [{}]", n, size)
            loop(tail, n + 1)
          }
      }
    }

    val result = loop(batches.toList, n = 1)

    result.onComplete {
      case Success(_) =>
        logger.debug("Processing completed of [{}] batches", size)
      case Failure(e) =>
        logger.error(e, "Processing of batches failed")
    }

    result

  }