def mappedSource()

in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [400:462]


  def mappedSource(): Source[Done, Future[Done]] = {
    val handlerLifecycle = handlerStrategy.lifecycle
    statusObserver.started(projectionId)
    telemetry = TelemetryProvider.start(projectionId, system)

    val source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed] =
      Source
        .futureSource(readPaused().flatMap {
          case false =>
            logger.debug("Projection [{}] started in resumed mode.", projectionId)
            handlerLifecycle
              .tryStart()
              .flatMap { _ =>
                sourceProvider
                  .source(() => readOffsets())
              }
          case true =>
            logger.info("Projection [{}] started in paused mode.", projectionId)
            // paused stream, no elements
            Future.successful(Source.never[Envelope])
        })
        .via(killSwitch.flow)
        .map { env =>
          statusObserver.beforeProcess(projectionId, env)
          val externalContext = telemetry.beforeProcess(env, sourceProvider.extractCreationTime(env))
          ProjectionContextImpl(sourceProvider.extractOffset(env), env, externalContext)
        }
        .filter { context =>
          sourceProvider match {
            case vsp: VerifiableSourceProvider[Offset, Envelope] =>
              vsp.verifyOffset(context.offset) match {
                case VerificationSuccess => true
                case VerificationFailure(reason) =>
                  logger.warning(
                    "Source provider instructed projection to skip offset [{}] with reason: {}",
                    context.offset,
                    reason)
                  false
              }
            case _ => true
          }
        }
        .mapMaterializedValue(_ => NotUsed)

    val composedSource: Source[Done, NotUsed] =
      offsetStrategy match {
        case ExactlyOnce(recoveryStrategyOpt) =>
          exactlyOnceProcessing(source, recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))

        case AtLeastOnce(afterEnvelopesOpt, orAfterDurationOpt, recoveryStrategyOpt) =>
          atLeastOnceProcessing(
            source,
            afterEnvelopesOpt.getOrElse(settings.saveOffsetAfterEnvelopes),
            orAfterDurationOpt.getOrElse(settings.saveOffsetAfterDuration),
            recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))

        case AtMostOnce(recoveryStrategyOpt) =>
          atMostOnceProcessing(source, recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
      }

    stopHandlerOnTermination(composedSource, handlerLifecycle)

  }