private def toTimeGroup()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala [348:393]


  private def toTimeGroup(
    step: Long,
    exprs: List[DataExpr],
    group: DatapointGroup,
    context: StreamContext
  ): TimeGroup = {
    val aggrSettings = AggrDatapoint.AggregatorSettings(
      context.maxInputDatapointsPerExpression,
      context.maxIntermediateDatapointsPerExpression,
      context.registry
    )
    val valuesInfo = group.datapoints.asScala.zipWithIndex
      .flatMap {
        case (d, i) =>
          val tags = d.tags.asScala.toMap
          exprs.filter(_.query.matches(tags)).map { expr =>
            // Restrict the tags to the common set for all matches to the data expression
            val keys = Query.exactKeys(expr.query) ++ expr.finalGrouping
            val exprTags = tags.filter(t => keys.contains(t._1))

            // Need to do the init for count aggregate
            val v = d.value
            val value = if (isCount(expr) && !v.isNaN) 1.0 else v

            // Position is used as source to avoid dedup of datapoints
            AggrDatapoint(group.timestamp, step, expr, i.toString, exprTags, value)
          }
      }
      .groupBy(_.expr)
      .map(t =>
        t._1 -> {
          val aggregator = AggrDatapoint.aggregate(t._2.toList, aggrSettings)

          aggregator match {
            case Some(aggr) if aggr.limitExceeded =>
              context.logDatapointsExceeded(group.timestamp, t._1.toString)
              AggrValuesInfo(Nil, t._2.size)
            case Some(aggr) =>
              AggrValuesInfo(aggr.datapoints, t._2.size)
            case _ =>
              AggrValuesInfo(Nil, t._2.size)
          }
        }
      )
    TimeGroup(group.timestamp, step, valuesInfo)
  }