in atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala [256:303]
protected def newAlgorithmInstance(context: EvalContext): OnlineAlgorithm
def dataExprs: List[DataExpr] = expr.dataExprs
def isGrouped: Boolean = expr.isGrouped
def groupByKey(tags: Map[String, String]): Option[String] = expr.groupByKey(tags)
def finalGrouping: List[String] = expr.finalGrouping
def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
val rs = expr.eval(context, data)
val state = rs.state.getOrElse(this, new StateMap).asInstanceOf[StateMap]
// Update expressions with data
val newData = rs.data.map { t =>
val bounded = t.data.bounded(context.start, context.end)
val length = bounded.data.length
val algo = state.get(t.id).fold(newAlgorithmInstance(context)) { s =>
OnlineAlgorithm(s)
}
var i = 0
while (i < length) {
bounded.data(i) = algo.next(bounded.data(i))
i += 1
}
if (algo.isEmpty)
state -= t.id
else
state(t.id) = algo.state
TimeSeries(t.tags, s"$name(${t.label})", bounded)
}
// Update the stateful buffers for expressions that do not have an explicit value for
// this interval. For streaming contexts only data that is reported for that interval
// will be present, but the state needs to be moved for all entries.
val noDataIds = state.keySet.diff(rs.data.map(_.id).toSet)
noDataIds.foreach { id =>
val algo = OnlineAlgorithm(state(id))
algo.next(Double.NaN)
if (algo.isEmpty)
state -= id
else
state(id) = algo.state
}
ResultSet(this, newData, rs.state + (this -> state))
}