private def atMostOnceProcessing()

in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [366:398]


  private def atMostOnceProcessing(
      source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
      recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {

    val handlerRecovery =
      HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)

    handlerStrategy match {
      case single: SingleHandlerStrategy[Envelope @unchecked] =>
        val handler = single.handler()
        source
          .mapAsync(parallelism = 1) { context =>
            saveOffsetAndReport(projectionId, context, 1).flatMap { _ =>
              val measured: () => Future[Done] = { () =>
                handler.process(context.envelope).map { done =>
                  statusObserver.afterProcess(projectionId, context.envelope)
                  // `telemetry.afterProcess` is invoked immediately after `handler.process`
                  telemetry.afterProcess(context.externalContext)
                  done
                }
              }

              handlerRecovery
                .applyRecovery(context.envelope, context.offset, context.offset, abort.future, measured)
            }
          }
          .map(_ => Done)
      case _ =>
        // not possible, no API for this
        throw new IllegalStateException("Unsupported combination of atMostOnce and grouped")
    }

  }