in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [239:364]
private def exactlyOnceProcessing(
source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {
val handlerRecovery =
HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)
def processGrouped(
handler: Handler[immutable.Seq[Envelope]],
handlerRecovery: HandlerRecoveryImpl[Offset, Envelope],
envelopesAndOffsets: immutable.Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
def processEnvelopes(partitioned: immutable.Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
val first = partitioned.head
val firstOffset = first.offset
val lastOffset = partitioned.last.offset
val envelopes = partitioned.map {
_.envelope
}
val measured: () => Future[Done] = { () =>
handler.process(envelopes).map { done =>
partitioned.foreach { ctx =>
statusObserver.afterProcess(projectionId, ctx.envelope)
telemetry.afterProcess(ctx.externalContext)
}
done
}
}
val onSkip = () => saveOffsetsAndReport(projectionId, partitioned)
handlerRecovery.applyRecovery(first.envelope, firstOffset, lastOffset, abort.future, measured, onSkip)
}
sourceProvider match {
case _: MergeableOffsetSourceProvider[Offset, Envelope] =>
val batches = envelopesAndOffsets
.flatMap {
case context @ ProjectionContextImpl(offset: MergeableOffset[_] @unchecked, _, _, _) =>
offset.entries.toSeq.map {
case (key, _) => (key, context)
}
case _ =>
// should never happen
throw new IllegalStateException("The offset should always be of type MergeableOffset")
}
.groupBy { case (key, _) => key }
.map {
case (key, keyAndContexts) =>
val envs = keyAndContexts.map {
case (_, context) => context.asInstanceOf[ProjectionContextImpl[Offset, Envelope]]
}
key -> envs
}
// process batches in sequence, but not concurrently, in order to provide singled threaded guarantees
// to the user envelope handler
serialize(batches) {
case (surrogateKey, partitionedEnvelopes) =>
logger.debug("Processing grouped envelopes for MergeableOffset with key [{}]", surrogateKey)
processEnvelopes(partitionedEnvelopes)
}
case _ =>
processEnvelopes(envelopesAndOffsets)
}
}
handlerStrategy match {
case single: SingleHandlerStrategy[Envelope @unchecked] =>
val handler: Handler[Envelope] = single.handler()
source
.mapAsync(1) { context =>
val measured: () => Future[Done] = () => {
handler.process(context.envelope).map { done =>
statusObserver.afterProcess(projectionId, context.envelope)
// `telemetry.afterProcess` is invoked immediately after `handler.process`
telemetry.afterProcess(context.externalContext)
try {
statusObserver.offsetProgress(projectionId, context.envelope)
} catch {
case NonFatal(_) => // ignore
}
telemetry.onOffsetStored(1)
done
}
}
val onSkip = () => saveOffsetAndReport(projectionId, context, 1)
handlerRecovery.applyRecovery(
context.envelope,
context.offset,
context.offset,
abort.future,
measured,
onSkip)
}
case grouped: GroupedHandlerStrategy[Envelope @unchecked] =>
val groupAfterEnvelopes = grouped.afterEnvelopes.getOrElse(settings.groupAfterEnvelopes)
val groupAfterDuration = grouped.orAfterDuration.getOrElse(settings.groupAfterDuration)
val handler = grouped.handler()
source
.groupedWithin(groupAfterEnvelopes, groupAfterDuration)
.filterNot(_.isEmpty)
.mapAsync(parallelism = 1) { group =>
val last: ProjectionContextImpl[Offset, Envelope] = group.last
processGrouped(handler, handlerRecovery, group)
.map { t =>
try {
statusObserver.offsetProgress(projectionId, last.envelope)
} catch {
case NonFatal(_) => // ignore
}
telemetry.onOffsetStored(group.length)
t
}
}
case _: FlowHandlerStrategy[_] =>
// not possible, no API for this
throw new IllegalStateException("Unsupported combination of exactlyOnce and flow")
}
}