protected def sync()

in atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala [50:155]


  protected def sync(subscriptions: Subscriptions): Unit = {
    val diff = Subscriptions.diff(currentSubs, subscriptions)
    currentSubs = subscriptions

    val flushableHandlers = List.newBuilder[EventHandler]

    // Pass-through events
    diff.added.events.foreach { sub =>
      val expr = ExprUtils.parseEventExpr(sub.expression)
      val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
      val handler = expr match {
        case EventExpr.Raw(_)       => EventHandler(sub, e => List(e))
        case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs)))
        case expr: EventExpr.Sample =>
          val converter = DatapointConverter(
            sub.id,
            sub.expression,
            expr.dataExpr,
            clock,
            sub.step,
            Some(event => expr.projectionKeys.map(event.extractValueSafe)),
            submit
          )
          EventHandler(
            sub,
            event => {
              converter.update(event)
              Nil
            },
            Some(converter)
          )
      }
      index.add(q, handler)
      subHandlers.put(sub, q -> handler)
      if (handler.converter.isDefined)
        flushableHandlers += handler
    }
    diff.unchanged.events.foreach { sub =>
      val handlerMeta = subHandlers.get(sub)
      if (handlerMeta != null && handlerMeta._2.converter.isDefined)
        flushableHandlers += handlerMeta._2
    }
    diff.removed.events.foreach(removeSubscription)

    // Analytics based on events
    diff.added.timeSeries.foreach { sub =>
      val expr = ExprUtils.parseDataExpr(sub.expression)
      val converter =
        DatapointConverter(sub.id, sub.expression, expr, clock, sub.step, None, submit)
      val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
      val handler = EventHandler(
        sub,
        event => {
          converter.update(event)
          Nil
        },
        Some(converter)
      )
      index.add(q, handler)
      subHandlers.put(sub, q -> handler)
      flushableHandlers += handler
    }
    diff.unchanged.timeSeries.foreach { sub =>
      val handlerMeta = subHandlers.get(sub)
      if (handlerMeta != null)
        flushableHandlers += handlerMeta._2
    }
    diff.removed.timeSeries.foreach(removeSubscription)

    // Trace pass-through
    traceHandlers = subscriptions.traceEvents.map { sub =>
      sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
    }.toMap

    // Analytics based on traces
    diff.added.traceTimeSeries.foreach { sub =>
      val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
      val dataExpr = tq.expr.expr.dataExprs.head
      val converter =
        DatapointConverter(sub.id, sub.expression, dataExpr, clock, sub.step, None, submit)
      val q = ExprUtils.toSpectatorQuery(removeValueClause(dataExpr.query))
      val handler = EventHandler(
        sub,
        event => {
          converter.update(event)
          Nil
        },
        Some(converter)
      )
      subHandlers.put(sub, q -> handler)
      flushableHandlers += handler
    }
    diff.unchanged.traceTimeSeries.foreach { sub =>
      val handlerMeta = subHandlers.get(sub)
      if (handlerMeta != null)
        flushableHandlers += handlerMeta._2
    }
    diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub))
    traceHandlersTS = subscriptions.traceTimeSeries.map { sub =>
      val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
      val tts = TraceTimeSeries(tq.q, removeValueClause(tq.expr.expr.dataExprs.head.query))
      sub -> tts
    }.toMap

    handlers = flushableHandlers.result()
  }