private[stream] def createProcessorFlow()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala [259:313]


  private[stream] def createProcessorFlow(
    context: StreamContext
  ): Flow[DataSources, MessageEnvelope, NotUsed] = {

    val g = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits.*

      // Split to 2 destinations: Input flow, Final Eval Step
      val datasources = builder.add(Broadcast[DataSources](2))

      // Combine the intermediate output and data sources for the final evaluation
      // step
      val finalEvalInput = builder.add(Merge[AnyRef](2))

      val intermediateEval = createInputFlow(context)
        .via(context.monitorFlow("10_InputBatches"))
        .via(new LwcToAggrDatapoint(context))
        .flatMapConcat { t =>
          Source(t.groupByStep)
        }
        .groupBy(Int.MaxValue, _.step, allowClosedSubstreamRecreation = true)
        .via(new TimeGrouped(context))
        .mergeSubstreams
        .via(context.monitorFlow("11_GroupedDatapoints"))

      // Use "buffer + dropTail" to avoid back pressure of DataSources broadcast, so that the 2
      // sub streams will not block each other
      datasources
        .out(0)
        .buffer(1, OverflowStrategy.dropTail) ~> intermediateEval ~> finalEvalInput.in(0)
      datasources
        .out(1)
        .buffer(1, OverflowStrategy.dropTail) ~> finalEvalInput.in(1)

      // Overall to the outside it looks like a flow of DataSources to MessageEnvelope
      FlowShape(datasources.in, finalEvalInput.out)
    }

    // Final evaluation of the overall expression
    Flow[DataSources]
      .map(ReplayLogging.log)
      .map { s =>
        val validated = context.validate(s)
        context.setDataSources(validated)
        validated
      }
      .via(g)
      .flatMapConcat(s => Source(splitByStep(s)))
      .groupBy(Int.MaxValue, stepSize, allowClosedSubstreamRecreation = true)
      .via(new FinalExprEval(context.interpreter, enableNoDataMsgs))
      .mergeSubstreams
      .via(context.monitorFlow("12_OutputSources"))
      .flatMapConcat(s => s)
      .via(context.monitorFlow("13_OutputMessages"))
  }