in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala [259:313]
private[stream] def createProcessorFlow(
context: StreamContext
): Flow[DataSources, MessageEnvelope, NotUsed] = {
val g = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits.*
// Split to 2 destinations: Input flow, Final Eval Step
val datasources = builder.add(Broadcast[DataSources](2))
// Combine the intermediate output and data sources for the final evaluation
// step
val finalEvalInput = builder.add(Merge[AnyRef](2))
val intermediateEval = createInputFlow(context)
.via(context.monitorFlow("10_InputBatches"))
.via(new LwcToAggrDatapoint(context))
.flatMapConcat { t =>
Source(t.groupByStep)
}
.groupBy(Int.MaxValue, _.step, allowClosedSubstreamRecreation = true)
.via(new TimeGrouped(context))
.mergeSubstreams
.via(context.monitorFlow("11_GroupedDatapoints"))
// Use "buffer + dropTail" to avoid back pressure of DataSources broadcast, so that the 2
// sub streams will not block each other
datasources
.out(0)
.buffer(1, OverflowStrategy.dropTail) ~> intermediateEval ~> finalEvalInput.in(0)
datasources
.out(1)
.buffer(1, OverflowStrategy.dropTail) ~> finalEvalInput.in(1)
// Overall to the outside it looks like a flow of DataSources to MessageEnvelope
FlowShape(datasources.in, finalEvalInput.out)
}
// Final evaluation of the overall expression
Flow[DataSources]
.map(ReplayLogging.log)
.map { s =>
val validated = context.validate(s)
context.setDataSources(validated)
validated
}
.via(g)
.flatMapConcat(s => Source(splitByStep(s)))
.groupBy(Int.MaxValue, stepSize, allowClosedSubstreamRecreation = true)
.via(new FinalExprEval(context.interpreter, enableNoDataMsgs))
.mergeSubstreams
.via(context.monitorFlow("12_OutputSources"))
.flatMapConcat(s => s)
.via(context.monitorFlow("13_OutputMessages"))
}