in atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala [50:155]
protected def sync(subscriptions: Subscriptions): Unit = {
val diff = Subscriptions.diff(currentSubs, subscriptions)
currentSubs = subscriptions
val flushableHandlers = List.newBuilder[EventHandler]
// Pass-through events
diff.added.events.foreach { sub =>
val expr = ExprUtils.parseEventExpr(sub.expression)
val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
val handler = expr match {
case EventExpr.Raw(_) => EventHandler(sub, e => List(e))
case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs)))
case expr: EventExpr.Sample =>
val converter = DatapointConverter(
sub.id,
sub.expression,
expr.dataExpr,
clock,
sub.step,
Some(event => expr.projectionKeys.map(event.extractValueSafe)),
submit
)
EventHandler(
sub,
event => {
converter.update(event)
Nil
},
Some(converter)
)
}
index.add(q, handler)
subHandlers.put(sub, q -> handler)
if (handler.converter.isDefined)
flushableHandlers += handler
}
diff.unchanged.events.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null && handlerMeta._2.converter.isDefined)
flushableHandlers += handlerMeta._2
}
diff.removed.events.foreach(removeSubscription)
// Analytics based on events
diff.added.timeSeries.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter =
DatapointConverter(sub.id, sub.expression, expr, clock, sub.step, None, submit)
val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
val handler = EventHandler(
sub,
event => {
converter.update(event)
Nil
},
Some(converter)
)
index.add(q, handler)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.timeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.timeSeries.foreach(removeSubscription)
// Trace pass-through
traceHandlers = subscriptions.traceEvents.map { sub =>
sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
}.toMap
// Analytics based on traces
diff.added.traceTimeSeries.foreach { sub =>
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val dataExpr = tq.expr.expr.dataExprs.head
val converter =
DatapointConverter(sub.id, sub.expression, dataExpr, clock, sub.step, None, submit)
val q = ExprUtils.toSpectatorQuery(removeValueClause(dataExpr.query))
val handler = EventHandler(
sub,
event => {
converter.update(event)
Nil
},
Some(converter)
)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.traceTimeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub))
traceHandlersTS = subscriptions.traceTimeSeries.map { sub =>
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val tts = TraceTimeSeries(tq.q, removeValueClause(tq.expr.expr.dataExprs.head.query))
sub -> tts
}.toMap
handlers = flushableHandlers.result()
}