protected def newAlgorithmInstance()

in atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala [256:303]


    protected def newAlgorithmInstance(context: EvalContext): OnlineAlgorithm

    def dataExprs: List[DataExpr] = expr.dataExprs

    def isGrouped: Boolean = expr.isGrouped

    def groupByKey(tags: Map[String, String]): Option[String] = expr.groupByKey(tags)

    def finalGrouping: List[String] = expr.finalGrouping

    def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
      val rs = expr.eval(context, data)
      val state = rs.state.getOrElse(this, new StateMap).asInstanceOf[StateMap]

      // Update expressions with data
      val newData = rs.data.map { t =>
        val bounded = t.data.bounded(context.start, context.end)
        val length = bounded.data.length
        val algo = state.get(t.id).fold(newAlgorithmInstance(context)) { s =>
          OnlineAlgorithm(s)
        }
        var i = 0
        while (i < length) {
          bounded.data(i) = algo.next(bounded.data(i))
          i += 1
        }
        if (algo.isEmpty)
          state -= t.id
        else
          state(t.id) = algo.state
        TimeSeries(t.tags, s"$name(${t.label})", bounded)
      }

      // Update the stateful buffers for expressions that do not have an explicit value for
      // this interval. For streaming contexts only data that is reported for that interval
      // will be present, but the state needs to be moved for all entries.
      val noDataIds = state.keySet.diff(rs.data.map(_.id).toSet)
      noDataIds.foreach { id =>
        val algo = OnlineAlgorithm(state(id))
        algo.next(Double.NaN)
        if (algo.isEmpty)
          state -= id
        else
          state(id) = algo.state
      }

      ResultSet(this, newData, rs.state + (this -> state))
    }