override def createLogic()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala [61:172]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler with OutHandler {

      private val tsState = scala.collection.mutable.HashMap.empty[String, DatapointMetadata]
      private val eventState = scala.collection.mutable.HashMap.empty[String, String]

      override def onPush(): Unit = {
        val dpBuilder = List.newBuilder[AggrDatapoint]
        val msgBuilder = List.newBuilder[Evaluator.MessageEnvelope]
        grab(in).foreach {
          case sb: LwcSubscription      => updateState(sb)
          case sb: LwcSubscriptionV2    => updateStateV2(sb)
          case dp: LwcDatapoint         => dpBuilder ++= pushDatapoint(dp)
          case ev: LwcEvent             => msgBuilder ++= pushEvent(ev)
          case dg: LwcDiagnosticMessage => msgBuilder ++= pushDiagnosticMessage(dg)
          case hb: LwcHeartbeat         => dpBuilder += pushHeartbeat(hb)
          case _                        =>
        }
        val datapoints = dpBuilder.result()
        val messages = msgBuilder.result()
        if (datapoints.isEmpty && messages.isEmpty)
          pull(in)
        else
          push(out, DatapointsTuple(datapoints, messages))
      }

      private def updateState(sub: LwcSubscription): Unit = {
        sub.metrics.foreach { m =>
          if (!tsState.contains(m.id)) {
            val expr = parseExpr(m.expression)
            tsState.put(m.id, DatapointMetadata(m.expression, expr, m.step))
          }
        }
      }

      private def updateStateV2(sub: LwcSubscriptionV2): Unit = {
        sub.subExprs.foreach { s =>
          if (isTimeSeries(sub, s)) {
            if (!tsState.contains(s.id)) {
              val expr = parseExpr(s.expression, sub.exprType)
              tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step))
            }
          } else if (sub.exprType.isEventType && !eventState.contains(s.id)) {
            eventState.put(s.id, s.expression)
          }
        }
      }

      private def isTimeSeries(sub: LwcSubscriptionV2, s: LwcDataExpr): Boolean = {
        // The sample type behaves similar to a time series for most processing, but maintains
        // some sample events during aggregation.
        sub.exprType.isTimeSeriesType || s.expression.contains(",:sample")
      }

      private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
        tsState.get(dp.id) match {
          case Some(sub) =>
            val expr = sub.dataExpr
            val step = sub.step
            Some(
              AggrDatapoint(
                dp.timestamp,
                step,
                expr,
                sub.dataExprStr,
                dp.tags,
                dp.value,
                dp.samples
              )
            )
          case None =>
            unknown.increment()
            None
        }
      }

      private def pushEvent(event: LwcEvent): List[Evaluator.MessageEnvelope] = {
        eventState.get(event.id) match {
          case Some(sub) => context.messagesForDataSource(sub, EventMessage(event.payload))
          case None      => unknown.increment(); Nil
        }
      }

      private def pushDiagnosticMessage(
        diagMsg: LwcDiagnosticMessage
      ): List[Evaluator.MessageEnvelope] = {
        tsState.get(diagMsg.id) match {
          case Some(sub) =>
            context.messagesForDataSource(sub.dataExprStr, diagMsg.message)
          case None =>
            eventState.get(diagMsg.id) match {
              case Some(sub) => context.messagesForDataSource(sub, diagMsg.message)
              case None      => unknown.increment(); Nil
            }
        }
      }

      private def pushHeartbeat(hb: LwcHeartbeat): AggrDatapoint = {
        AggrDatapoint.heartbeat(hb.timestamp, hb.step)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }

      setHandlers(in, out, this)
    }
  }