in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [400:462]
def mappedSource(): Source[Done, Future[Done]] = {
val handlerLifecycle = handlerStrategy.lifecycle
statusObserver.started(projectionId)
telemetry = TelemetryProvider.start(projectionId, system)
val source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed] =
Source
.futureSource(readPaused().flatMap {
case false =>
logger.debug("Projection [{}] started in resumed mode.", projectionId)
handlerLifecycle
.tryStart()
.flatMap { _ =>
sourceProvider
.source(() => readOffsets())
}
case true =>
logger.info("Projection [{}] started in paused mode.", projectionId)
// paused stream, no elements
Future.successful(Source.never[Envelope])
})
.via(killSwitch.flow)
.map { env =>
statusObserver.beforeProcess(projectionId, env)
val externalContext = telemetry.beforeProcess(env, sourceProvider.extractCreationTime(env))
ProjectionContextImpl(sourceProvider.extractOffset(env), env, externalContext)
}
.filter { context =>
sourceProvider match {
case vsp: VerifiableSourceProvider[Offset, Envelope] =>
vsp.verifyOffset(context.offset) match {
case VerificationSuccess => true
case VerificationFailure(reason) =>
logger.warning(
"Source provider instructed projection to skip offset [{}] with reason: {}",
context.offset,
reason)
false
}
case _ => true
}
}
.mapMaterializedValue(_ => NotUsed)
val composedSource: Source[Done, NotUsed] =
offsetStrategy match {
case ExactlyOnce(recoveryStrategyOpt) =>
exactlyOnceProcessing(source, recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
case AtLeastOnce(afterEnvelopesOpt, orAfterDurationOpt, recoveryStrategyOpt) =>
atLeastOnceProcessing(
source,
afterEnvelopesOpt.getOrElse(settings.saveOffsetAfterEnvelopes),
orAfterDurationOpt.getOrElse(settings.saveOffsetAfterDuration),
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
case AtMostOnce(recoveryStrategyOpt) =>
atMostOnceProcessing(source, recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
}
stopHandlerOnTermination(composedSource, handlerLifecycle)
}