in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [140:237]
private def atLeastOnceProcessing(
source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
afterEnvelopes: Int,
orAfterDuration: FiniteDuration,
recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {
val atLeastOnceHandlerFlow
: Flow[ProjectionContextImpl[Offset, Envelope], ProjectionContextImpl[Offset, Envelope], NotUsed] =
handlerStrategy match {
case single: SingleHandlerStrategy[Envelope @unchecked] =>
val handler = single.handler()
val handlerRecovery =
HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)
Flow[ProjectionContextImpl[Offset, Envelope]].mapAsync(parallelism = 1) { context =>
val measured: () => Future[Done] = { () =>
handler.process(context.envelope).map { done =>
statusObserver.afterProcess(projectionId, context.envelope)
// `telemetry.afterProcess` is invoked immediately after `handler.process`
telemetry.afterProcess(context.externalContext)
done
}
}
handlerRecovery
.applyRecovery(context.envelope, context.offset, context.offset, abort.future, measured)
.map { _ =>
context
}
}
case grouped: GroupedHandlerStrategy[Envelope @unchecked] =>
val groupAfterEnvelopes = grouped.afterEnvelopes.getOrElse(settings.groupAfterEnvelopes)
val groupAfterDuration = grouped.orAfterDuration.getOrElse(settings.groupAfterDuration)
val handler = grouped.handler()
val handlerRecovery =
HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)
Flow[ProjectionContextImpl[Offset, Envelope]]
.groupedWithin(groupAfterEnvelopes, groupAfterDuration)
.filterNot(_.isEmpty)
.mapAsync(parallelism = 1) { group =>
val first = group.head
val last = group.last
val envelopes = group.map { _.envelope }
val measured: () => Future[Done] = { () =>
handler.process(envelopes).map { done =>
group.foreach { ctx =>
statusObserver.afterProcess(projectionId, ctx.envelope)
telemetry.afterProcess(ctx.externalContext)
}
done
}
}
handlerRecovery
.applyRecovery(first.envelope, first.offset, last.offset, abort.future, measured)
.map { _ =>
last.copy(groupSize = envelopes.length)
}
}
case f: FlowHandlerStrategy[Envelope @unchecked] =>
val flow =
f.flowCtx.asFlow.watchTermination() {
case (_, futDone) =>
futDone.recoverWith {
case t =>
telemetry.error(t)
futDone
}
}
Flow[ProjectionContextImpl[Offset, Envelope]]
.map { context => context.envelope -> context }
.via(flow)
.map {
case (_, context) =>
val ctx = context.asInstanceOf[ProjectionContextImpl[Offset, Envelope]]
statusObserver.afterProcess(projectionId, ctx.envelope)
telemetry.afterProcess(ctx.externalContext)
ctx
}
}
if (afterEnvelopes == 1)
// optimization of general AtLeastOnce case
source.via(atLeastOnceHandlerFlow).mapAsync(1) { context =>
saveOffsetAndReport(projectionId, context, context.groupSize)
}
else {
source
.via(atLeastOnceHandlerFlow)
.groupedWithin(afterEnvelopes, orAfterDuration)
.filterNot(_.isEmpty)
.mapAsync(parallelism = 1) { batch =>
saveOffsetsAndReport(projectionId, batch)
}
}
}