in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala [61:172]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
private val tsState = scala.collection.mutable.HashMap.empty[String, DatapointMetadata]
private val eventState = scala.collection.mutable.HashMap.empty[String, String]
override def onPush(): Unit = {
val dpBuilder = List.newBuilder[AggrDatapoint]
val msgBuilder = List.newBuilder[Evaluator.MessageEnvelope]
grab(in).foreach {
case sb: LwcSubscription => updateState(sb)
case sb: LwcSubscriptionV2 => updateStateV2(sb)
case dp: LwcDatapoint => dpBuilder ++= pushDatapoint(dp)
case ev: LwcEvent => msgBuilder ++= pushEvent(ev)
case dg: LwcDiagnosticMessage => msgBuilder ++= pushDiagnosticMessage(dg)
case hb: LwcHeartbeat => dpBuilder += pushHeartbeat(hb)
case _ =>
}
val datapoints = dpBuilder.result()
val messages = msgBuilder.result()
if (datapoints.isEmpty && messages.isEmpty)
pull(in)
else
push(out, DatapointsTuple(datapoints, messages))
}
private def updateState(sub: LwcSubscription): Unit = {
sub.metrics.foreach { m =>
if (!tsState.contains(m.id)) {
val expr = parseExpr(m.expression)
tsState.put(m.id, DatapointMetadata(m.expression, expr, m.step))
}
}
}
private def updateStateV2(sub: LwcSubscriptionV2): Unit = {
sub.subExprs.foreach { s =>
if (isTimeSeries(sub, s)) {
if (!tsState.contains(s.id)) {
val expr = parseExpr(s.expression, sub.exprType)
tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step))
}
} else if (sub.exprType.isEventType && !eventState.contains(s.id)) {
eventState.put(s.id, s.expression)
}
}
}
private def isTimeSeries(sub: LwcSubscriptionV2, s: LwcDataExpr): Boolean = {
// The sample type behaves similar to a time series for most processing, but maintains
// some sample events during aggregation.
sub.exprType.isTimeSeriesType || s.expression.contains(",:sample")
}
private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
tsState.get(dp.id) match {
case Some(sub) =>
val expr = sub.dataExpr
val step = sub.step
Some(
AggrDatapoint(
dp.timestamp,
step,
expr,
sub.dataExprStr,
dp.tags,
dp.value,
dp.samples
)
)
case None =>
unknown.increment()
None
}
}
private def pushEvent(event: LwcEvent): List[Evaluator.MessageEnvelope] = {
eventState.get(event.id) match {
case Some(sub) => context.messagesForDataSource(sub, EventMessage(event.payload))
case None => unknown.increment(); Nil
}
}
private def pushDiagnosticMessage(
diagMsg: LwcDiagnosticMessage
): List[Evaluator.MessageEnvelope] = {
tsState.get(diagMsg.id) match {
case Some(sub) =>
context.messagesForDataSource(sub.dataExprStr, diagMsg.message)
case None =>
eventState.get(diagMsg.id) match {
case Some(sub) => context.messagesForDataSource(sub, diagMsg.message)
case None => unknown.increment(); Nil
}
}
}
private def pushHeartbeat(hb: LwcHeartbeat): AggrDatapoint = {
AggrDatapoint.heartbeat(hb.timestamp, hb.step)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
setHandlers(in, out, this)
}
}