private def atLeastOnceProcessing()

in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [140:237]


  private def atLeastOnceProcessing(
      source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
      afterEnvelopes: Int,
      orAfterDuration: FiniteDuration,
      recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {

    val atLeastOnceHandlerFlow
        : Flow[ProjectionContextImpl[Offset, Envelope], ProjectionContextImpl[Offset, Envelope], NotUsed] =
      handlerStrategy match {
        case single: SingleHandlerStrategy[Envelope @unchecked] =>
          val handler = single.handler()
          val handlerRecovery =
            HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)

          Flow[ProjectionContextImpl[Offset, Envelope]].mapAsync(parallelism = 1) { context =>
            val measured: () => Future[Done] = { () =>
              handler.process(context.envelope).map { done =>
                statusObserver.afterProcess(projectionId, context.envelope)
                // `telemetry.afterProcess` is invoked immediately after `handler.process`
                telemetry.afterProcess(context.externalContext)
                done
              }
            }
            handlerRecovery
              .applyRecovery(context.envelope, context.offset, context.offset, abort.future, measured)
              .map { _ =>
                context
              }

          }

        case grouped: GroupedHandlerStrategy[Envelope @unchecked] =>
          val groupAfterEnvelopes = grouped.afterEnvelopes.getOrElse(settings.groupAfterEnvelopes)
          val groupAfterDuration = grouped.orAfterDuration.getOrElse(settings.groupAfterDuration)
          val handler = grouped.handler()
          val handlerRecovery =
            HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)

          Flow[ProjectionContextImpl[Offset, Envelope]]
            .groupedWithin(groupAfterEnvelopes, groupAfterDuration)
            .filterNot(_.isEmpty)
            .mapAsync(parallelism = 1) { group =>
              val first = group.head
              val last = group.last
              val envelopes = group.map { _.envelope }
              val measured: () => Future[Done] = { () =>
                handler.process(envelopes).map { done =>
                  group.foreach { ctx =>
                    statusObserver.afterProcess(projectionId, ctx.envelope)
                    telemetry.afterProcess(ctx.externalContext)
                  }
                  done
                }
              }
              handlerRecovery
                .applyRecovery(first.envelope, first.offset, last.offset, abort.future, measured)
                .map { _ =>
                  last.copy(groupSize = envelopes.length)
                }
            }

        case f: FlowHandlerStrategy[Envelope @unchecked] =>
          val flow =
            f.flowCtx.asFlow.watchTermination() {
              case (_, futDone) =>
                futDone.recoverWith {
                  case t =>
                    telemetry.error(t)
                    futDone
                }
            }
          Flow[ProjectionContextImpl[Offset, Envelope]]
            .map { context => context.envelope -> context }
            .via(flow)
            .map {
              case (_, context) =>
                val ctx = context.asInstanceOf[ProjectionContextImpl[Offset, Envelope]]
                statusObserver.afterProcess(projectionId, ctx.envelope)
                telemetry.afterProcess(ctx.externalContext)
                ctx
            }
      }

    if (afterEnvelopes == 1)
      // optimization of general AtLeastOnce case
      source.via(atLeastOnceHandlerFlow).mapAsync(1) { context =>
        saveOffsetAndReport(projectionId, context, context.groupSize)
      }
    else {
      source
        .via(atLeastOnceHandlerFlow)
        .groupedWithin(afterEnvelopes, orAfterDuration)
        .filterNot(_.isEmpty)
        .mapAsync(parallelism = 1) { batch =>
          saveOffsetsAndReport(projectionId, batch)
        }
    }
  }