override def createLogic()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala [77:219]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler with OutHandler {

      private val droppedOldUpdater = droppedOld.batchUpdater(10_000)
      private val droppedFutureUpdater = droppedFuture.batchUpdater(10_000)
      private val bufferedUpdater = buffered.batchUpdater(10_000)

      private val buf = new Array[AggrMap](numBuffers)
      buf.indices.foreach { i =>
        buf(i) = new AggrMap
      }

      private val timestamps = new Array[Long](numBuffers)

      private var step = -1L
      private var cutoffTime = 0L

      private var pending: List[TimeGroup] = Nil

      private def findBuffer(t: Long): Int = {
        var i = 0
        var min = 0
        while (i < timestamps.length) {
          if (timestamps(i) == t) return i
          if (i > 0 && timestamps(i) < timestamps(i - 1)) min = i
          i += 1
        }
        -min - 1
      }

      /**
        * Add a value to the aggregate or create a new aggregate initialized to the provided
        * value. Heartbeat datapoints will be ignored as they are just used to trigger flushing
        * of the time group.
        */
      private def aggregate(i: Int, v: AggrDatapoint): Unit = {
        if (!v.isHeartbeat) {
          val aggr = buf(i).get(v.expr)
          if (aggr == null) {
            buf(i).put(v.expr, AggrDatapoint.newAggregator(v, aggrSettings))
          } else {
            aggr.aggregate(v)
          }
        }
      }

      /**
        * Push the most recently completed time group to the next stage and reset the buffer
        * so it can be used for a new time window.
        */
      private def flush(i: Int): Option[TimeGroup] = {
        droppedOldUpdater.flush()
        droppedFutureUpdater.flush()
        bufferedUpdater.flush()
        val t = timestamps(i)
        val group = if (t > 0) Some(toTimeGroup(t, buf(i))) else None
        cutoffTime = t
        buf(i) = new AggrMap
        group
      }

      private def toTimeGroup(ts: Long, aggrMap: AggrMap): TimeGroup = {
        import scala.jdk.CollectionConverters.*
        val aggregateMapForExpWithinLimits = aggrMap.asScala
          .filter {
            case (expr, aggr) if aggr.limitExceeded =>
              context.logDatapointsExceeded(ts, expr.toString)
              false
            case _ =>
              true
          }
          .map {
            case (expr, aggr) => expr -> AggrValuesInfo(aggr.datapoints, aggr.numInputDatapoints)
          }
          .toMap

        TimeGroup(ts, step, aggregateMapForExpWithinLimits)
      }

      override def onPush(): Unit = {
        val builder = List.newBuilder[TimeGroup]
        val now = clock.wallTime()
        val tuple = grab(in)
        tuple.data.foreach { v =>
          val t = v.timestamp
          step = v.step
          if (t > now) {
            droppedFutureUpdater.increment()
          } else if (t <= cutoffTime) {
            if (!v.isHeartbeat) {
              droppedOldUpdater.increment()
            }
          } else {
            bufferedUpdater.increment()
            val i = findBuffer(t)
            if (i >= 0) {
              aggregate(i, v)
            } else {
              val pos = -i - 1
              builder ++= flush(pos)
              aggregate(pos, v)
              timestamps(pos) = t
            }
          }
        }
        val groups = builder.result()
        if (groups.isEmpty && tuple.messages.isEmpty)
          pull(in)
        else
          push(out, TimeGroupsTuple(groups, tuple.messages))
      }

      override def onPull(): Unit = {
        if (isClosed(in))
          flushPending()
        else
          pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val groups = buf.indices.map { i =>
          toTimeGroup(timestamps(i), buf(i))
        }.toList
        pending = groups.filter(_.timestamp > 0).sortWith(_.timestamp < _.timestamp)
        flushPending()
        droppedOldUpdater.close()
        droppedFutureUpdater.close()
        bufferedUpdater.close()
      }

      private def flushPending(): Unit = {
        if (pending.nonEmpty && isAvailable(out)) {
          push(out, TimeGroupsTuple(pending))
          pending = Nil
        }
        if (pending.isEmpty) {
          completeStage()
        }
      }

      setHandlers(in, out, this)
    }
  }