private def exactlyOnceProcessing()

in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [239:364]


  private def exactlyOnceProcessing(
      source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
      recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {

    val handlerRecovery =
      HandlerRecoveryImpl[Offset, Envelope](projectionId, recoveryStrategy, logger, statusObserver, telemetry)

    def processGrouped(
        handler: Handler[immutable.Seq[Envelope]],
        handlerRecovery: HandlerRecoveryImpl[Offset, Envelope],
        envelopesAndOffsets: immutable.Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {

      def processEnvelopes(partitioned: immutable.Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
        val first = partitioned.head
        val firstOffset = first.offset
        val lastOffset = partitioned.last.offset
        val envelopes = partitioned.map {
          _.envelope
        }

        val measured: () => Future[Done] = { () =>
          handler.process(envelopes).map { done =>
            partitioned.foreach { ctx =>
              statusObserver.afterProcess(projectionId, ctx.envelope)
              telemetry.afterProcess(ctx.externalContext)
            }
            done
          }
        }
        val onSkip = () => saveOffsetsAndReport(projectionId, partitioned)
        handlerRecovery.applyRecovery(first.envelope, firstOffset, lastOffset, abort.future, measured, onSkip)
      }

      sourceProvider match {
        case _: MergeableOffsetSourceProvider[_, _] =>
          val batches = envelopesAndOffsets
            .flatMap {
              case context @ ProjectionContextImpl(offset: MergeableOffset[_] @unchecked, _, _, _) =>
                offset.entries.toSeq.map {
                  case (key, _) => (key, context)
                }
              case _ =>
                // should never happen
                throw new IllegalStateException("The offset should always be of type MergeableOffset")
            }
            .groupBy { case (key, _) => key }
            .map {
              case (key, keyAndContexts) =>
                val envs = keyAndContexts.map {
                  case (_, context) => context.asInstanceOf[ProjectionContextImpl[Offset, Envelope]]
                }
                key -> envs
            }

          // process batches in sequence, but not concurrently, in order to provide singled threaded guarantees
          // to the user envelope handler
          serialize(batches) {
            case (surrogateKey, partitionedEnvelopes) =>
              logger.debug("Processing grouped envelopes for MergeableOffset with key [{}]", surrogateKey)
              processEnvelopes(partitionedEnvelopes)
          }
        case _ =>
          processEnvelopes(envelopesAndOffsets)
      }
    }

    handlerStrategy match {
      case single: SingleHandlerStrategy[Envelope @unchecked] =>
        val handler: Handler[Envelope] = single.handler()
        source
          .mapAsync(1) { context =>
            val measured: () => Future[Done] = () => {
              handler.process(context.envelope).map { done =>
                statusObserver.afterProcess(projectionId, context.envelope)
                // `telemetry.afterProcess` is invoked immediately after `handler.process`
                telemetry.afterProcess(context.externalContext)

                try {
                  statusObserver.offsetProgress(projectionId, context.envelope)
                } catch {
                  case NonFatal(_) => // ignore
                }
                telemetry.onOffsetStored(1)

                done
              }
            }

            val onSkip = () => saveOffsetAndReport(projectionId, context, 1)
            handlerRecovery.applyRecovery(
              context.envelope,
              context.offset,
              context.offset,
              abort.future,
              measured,
              onSkip)

          }

      case grouped: GroupedHandlerStrategy[Envelope @unchecked] =>
        val groupAfterEnvelopes = grouped.afterEnvelopes.getOrElse(settings.groupAfterEnvelopes)
        val groupAfterDuration = grouped.orAfterDuration.getOrElse(settings.groupAfterDuration)
        val handler = grouped.handler()

        source
          .groupedWithin(groupAfterEnvelopes, groupAfterDuration)
          .filterNot(_.isEmpty)
          .mapAsync(parallelism = 1) { group =>
            val last: ProjectionContextImpl[Offset, Envelope] = group.last
            processGrouped(handler, handlerRecovery, group)
              .map { t =>
                try {
                  statusObserver.offsetProgress(projectionId, last.envelope)
                } catch {
                  case NonFatal(_) => // ignore
                }
                telemetry.onOffsetStored(group.length)
                t
              }
          }

      case _: FlowHandlerStrategy[_] =>
        // not possible, no API for this
        throw new IllegalStateException("Unsupported combination of exactlyOnce and flow")
    }
  }