in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [366:398]
private def atMostOnceProcessing(
source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {
val handlerRecovery =
HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)
handlerStrategy match {
case single: SingleHandlerStrategy[Envelope @unchecked] =>
val handler = single.handler()
source
.mapAsync(parallelism = 1) { context =>
saveOffsetAndReport(projectionId, context, 1).flatMap { _ =>
val measured: () => Future[Done] = { () =>
handler.process(context.envelope).map { done =>
statusObserver.afterProcess(projectionId, context.envelope)
// `telemetry.afterProcess` is invoked immediately after `handler.process`
telemetry.afterProcess(context.externalContext)
done
}
}
handlerRecovery
.applyRecovery(context.envelope, context.offset, context.offset, abort.future, measured)
}
}
.map(_ => Done)
case _ =>
// not possible, no API for this
throw new IllegalStateException("Unsupported combination of atMostOnce and grouped")
}
}