private def source()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala [158:207]


  private def source(settings: Settings, expr: EventExpr): Source[ByteString, NotUsed] = {
    val id = computeId(ExprType.EVENTS, expr, settings.step)
    val dataExpr = LwcDataExpr(id, expr.toString, settings.step)
    val subMessage = LwcSubscriptionV2(expr.toString, ExprType.EVENTS, List(dataExpr))
    val start = System.currentTimeMillis() / settings.step * settings.step

    val exprSource = Source(0 until settings.numStepIntervals)
      .throttle(1, FiniteDuration(settings.step, TimeUnit.MILLISECONDS))
      .flatMapConcat { i =>
        Source(0 until settings.inputDataSize)
          .flatMap { j =>
            val tags = Query.tags(expr.query)
            val data = Map(
              "tags" -> tags,
              "i"    -> i,
              "j"    -> j
            )
            expr match {
              case EventExpr.Raw(_) =>
                val json = Json.decode[JsonNode](Json.encode(data))
                Source.single(LwcEvent(id, json))
              case EventExpr.Table(_, cs) =>
                val columns = cs.map(c => data.getOrElse(c, null))
                val json = Json.decode[JsonNode](Json.encode(columns))
                Source.single(LwcEvent(id, json))
              case EventExpr.Sample(_, by, cs) =>
                val groupKey = groupByKey(data, by)
                if (groupKey == null) {
                  Source.empty
                } else {
                  val timestamp = i * settings.step + start
                  val columns = cs.map(c => data.getOrElse(c, null))
                  val datapoint = LwcDatapoint(
                    timestamp,
                    id,
                    tags ++ by.map(k => k -> data(k).toString),
                    1.0,
                    List(columns)
                  )
                  Source.single(datapoint)
                }
            }
          }
      }

    Source
      .single(subMessage)
      .concat(exprSource)
      .map(msg => ByteString(Json.encode(msg)))
  }