in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala [348:393]
private def toTimeGroup(
step: Long,
exprs: List[DataExpr],
group: DatapointGroup,
context: StreamContext
): TimeGroup = {
val aggrSettings = AggrDatapoint.AggregatorSettings(
context.maxInputDatapointsPerExpression,
context.maxIntermediateDatapointsPerExpression,
context.registry
)
val valuesInfo = group.datapoints.asScala.zipWithIndex
.flatMap {
case (d, i) =>
val tags = d.tags.asScala.toMap
exprs.filter(_.query.matches(tags)).map { expr =>
// Restrict the tags to the common set for all matches to the data expression
val keys = Query.exactKeys(expr.query) ++ expr.finalGrouping
val exprTags = tags.filter(t => keys.contains(t._1))
// Need to do the init for count aggregate
val v = d.value
val value = if (isCount(expr) && !v.isNaN) 1.0 else v
// Position is used as source to avoid dedup of datapoints
AggrDatapoint(group.timestamp, step, expr, i.toString, exprTags, value)
}
}
.groupBy(_.expr)
.map(t =>
t._1 -> {
val aggregator = AggrDatapoint.aggregate(t._2.toList, aggrSettings)
aggregator match {
case Some(aggr) if aggr.limitExceeded =>
context.logDatapointsExceeded(group.timestamp, t._1.toString)
AggrValuesInfo(Nil, t._2.size)
case Some(aggr) =>
AggrValuesInfo(aggr.datapoints, t._2.size)
case _ =>
AggrValuesInfo(Nil, t._2.size)
}
}
)
TimeGroup(group.timestamp, step, valuesInfo)
}