in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala [70:352]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
// Maintains the state for each expression we need to evaluate. TODO: implement
// limits to sanity check against running of our memory
private val states =
scala.collection.mutable.HashMap.empty[StyleExpr, Map[StatefulExpr, Any]]
// Step size for datapoints flowing through, it will be determined by the first data
// sources message that arrives and should be consistent for the life of this stage
private var step = -1L
// Sampled event expressions
private var sampledEventRecipients = Map.empty[DataExpr, List[ExprInfo]]
// Each expression matched with a list of data source ids that should receive
// the data for it
private var recipients = List.empty[(StyleExpr, List[ExprInfo])]
// Track the set of DataExprs per DataSource
private var dataSourceIdToDataExprs = Map.empty[String, Set[DataExpr]]
// Empty data map used as base to account for expressions that do not have any
// matches for a given time interval
private var noData = Map.empty[DataExpr, List[TimeSeries]]
private def error(expr: String, hint: String, t: Throwable): DiagnosticMessage = {
val str = s"$hint [[$expr]]: ${t.getClass.getSimpleName}: ${t.getMessage}"
DiagnosticMessage.error(str)
}
// Updates the recipients list
private def handleDataSources(ds: DataSources): Unit = {
import scala.jdk.CollectionConverters.*
val sources = ds.sources.asScala.toList
step = ds.stepSize()
// Get set of expressions before we update the list
val previous = recipients.map(t => t._1 -> t._1).toMap
// Compute set of sampled event expressions
sampledEventRecipients = sources
.flatMap { s =>
exprInterpreter.parseSampleExpr(Uri(s.uri)).map { expr =>
expr.dataExpr -> ExprInfo(s.id, None)
}
}
.groupBy(_._1)
.map(t => t._1 -> t._2.map(_._2))
// Error messages for invalid expressions
val errors = List.newBuilder[MessageEnvelope]
// Compute the new set of expressions
recipients = sources
.flatMap { s =>
try {
exprInterpreter.evalTimeSeries(Uri(s.uri)).toList.flatMap { graphCfg =>
val exprs = graphCfg.exprs
// Reuse the previous evaluated expression if available. States for the stateful
// expressions are maintained in an IdentityHashMap so if the instances change
// the state will be reset.
exprs.map { e =>
val paletteName =
if (graphCfg.flags.presentationMetadataEnabled) {
val axis = e.axis.getOrElse(0)
Some(graphCfg.flags.axisPalette(graphCfg.settings, axis))
} else {
None
}
previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName)
}
}
} catch {
case e: Exception =>
errors += new MessageEnvelope(s.id, error(s.uri, "invalid expression", e))
Nil
}
}
.groupBy(_._1)
.map(t => t._1 -> t._2.map(_._2))
.toList
dataSourceIdToDataExprs = recipients
.flatMap(styleExprAndIds =>
styleExprAndIds._2.map(id => id -> styleExprAndIds._1.expr.dataExprs.toSet)
)
// Fold to mutable map to avoid creating new Map on every update
.foldLeft(mutable.Map.empty[String, Set[DataExpr]]) {
case (map, (info, dataExprs)) =>
map += map.get(info.id).fold(info.id -> dataExprs) { vs =>
info.id -> (dataExprs ++ vs)
}
}
.toMap
// Cleanup state for any expressions that are no longer needed
val removed = previous.keySet -- recipients.map(_._1).toSet
removed.foreach { expr =>
states -= expr
}
// Setup no data map
noData = recipients
.flatMap(_._1.expr.dataExprs)
.distinct
.map {
// If there is no grouping, then use a no data line, otherwise use an empty set
case e if e.finalGrouping.isEmpty =>
e -> List(TimeSeries.noData(e.query, step))
case e =>
e -> Nil
}
.toMap
push(out, Source(errors.result()))
}
// Generate a no data line for a full expression. Use the tagging information from the
// first data expression that is found.
private def noData(expr: StyleExpr): TimeSeries = {
expr.expr.dataExprs.headOption match {
case Some(e) => TimeSeries.noData(e.query, step)
case None => TimeSeries.noData(step)
}
}
// Perform the final evaluation and create a source with the TimeSeriesMessages
// addressed to each recipient
private def handleData(group: TimeGroup): List[MessageEnvelope] = {
// Finalize the DataExprs, needed as input for further evaluation
val timestamp = group.timestamp
val groupedDatapoints = group.dataExprValues
// Messages for sampled events that look similar to time series
val sampledEventMessages = groupedDatapoints.flatMap {
case (expr, vs) =>
sampledEventRecipients
.get(expr)
.fold(List.empty[MessageEnvelope]) { infos =>
val ts = vs.values.map(toTimeSeriesMessage)
ts.flatMap { msg =>
infos.map { info =>
new MessageEnvelope(info.id, msg)
}
}
}
}.toList
// Data for each time series data expression or no-data line if there is no data for
// the interval
val dataExprToDatapoints = noData ++ groupedDatapoints.map {
case (k, vs) =>
k -> vs.values.map(_.toTimeSeries)
}
// Collect input and intermediate data size per DataSource
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) =>
dataExprSet.foreach { dataExpr =>
group.dataExprValues.get(dataExpr).foreach { info =>
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
}
}
// Generate the time series and diagnostic output
val output = recipients
.flatMap {
case (styleExpr, infos) =>
val exprStr = styleExpr.toString
val ids = infos.map(_.id)
// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
val context = EvalContext(timestamp, timestamp + step, step, state)
try {
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data
// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))
// Create time series messages
infos.flatMap { info =>
data.map { t =>
val ts = TimeSeriesMessage(
styleExpr,
context,
t.withLabel(styleExpr.legend(t)),
info.palette,
Some(exprStr)
)
new MessageEnvelope(info.id, ts)
}
}
} catch {
case e: Exception =>
val msg = error(styleExpr.toString, "final eval failed", e)
ids.map { id =>
new MessageEnvelope(id, msg)
}
}
}
.filter { env =>
enableNoDataMsgs || hasFiniteValue(env.message())
}
val rateMessages = rateCollector.getAll.map {
case (id, rate) => new MessageEnvelope(id, rate)
}.toList
sampledEventMessages ++ output ++ rateMessages
}
private def hasFiniteValue(value: AnyRef): Boolean = {
value match {
case ts: TimeSeriesMessage => valueNotNaN(ts.data)
case _ => true
}
}
private def valueNotNaN(value: ChunkData): Boolean = {
value match {
case ArrayData(vs) => vs.exists(!_.isNaN)
case null => true
}
}
private def toTimeSeriesMessage(value: AggrDatapoint): TimeSeriesMessage = {
// For sampled messages, convert to rate per step, i.e. a raw count per step, rather
// than a rate per second.
val secondsPerStep = step / 1000.0
val ratePerStep = value.value * secondsPerStep
val id = TaggedItem.computeId(value.tags + ("atlas.query" -> value.source)).toString
TimeSeriesMessage(
id,
value.source,
value.expr.finalGrouping,
value.timestamp,
value.timestamp + step,
step,
TimeSeries.toLabel(value.tags),
value.tags,
ArrayData(Array(ratePerStep)),
None,
value.samples
)
}
private def handleSingleGroup(g: TimeGroup): Unit = {
push(out, Source(handleData(g)))
}
private def handleGroups(t: TimeGroupsTuple): Unit = {
val msgs = List.newBuilder[MessageEnvelope]
msgs ++= t.messages
msgs ++= t.groups.flatMap(handleData)
push(out, Source(msgs.result()))
}
override def onPush(): Unit = {
grab(in) match {
case ds: DataSources => handleDataSources(ds)
case data: TimeGroup => handleSingleGroup(data)
case t: TimeGroupsTuple => handleGroups(t)
case v => throw new MatchError(v)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
setHandlers(in, out, this)
}
}