in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala [158:207]
private def source(settings: Settings, expr: EventExpr): Source[ByteString, NotUsed] = {
val id = computeId(ExprType.EVENTS, expr, settings.step)
val dataExpr = LwcDataExpr(id, expr.toString, settings.step)
val subMessage = LwcSubscriptionV2(expr.toString, ExprType.EVENTS, List(dataExpr))
val start = System.currentTimeMillis() / settings.step * settings.step
val exprSource = Source(0 until settings.numStepIntervals)
.throttle(1, FiniteDuration(settings.step, TimeUnit.MILLISECONDS))
.flatMapConcat { i =>
Source(0 until settings.inputDataSize)
.flatMap { j =>
val tags = Query.tags(expr.query)
val data = Map(
"tags" -> tags,
"i" -> i,
"j" -> j
)
expr match {
case EventExpr.Raw(_) =>
val json = Json.decode[JsonNode](Json.encode(data))
Source.single(LwcEvent(id, json))
case EventExpr.Table(_, cs) =>
val columns = cs.map(c => data.getOrElse(c, null))
val json = Json.decode[JsonNode](Json.encode(columns))
Source.single(LwcEvent(id, json))
case EventExpr.Sample(_, by, cs) =>
val groupKey = groupByKey(data, by)
if (groupKey == null) {
Source.empty
} else {
val timestamp = i * settings.step + start
val columns = cs.map(c => data.getOrElse(c, null))
val datapoint = LwcDatapoint(
timestamp,
id,
tags ++ by.map(k => k -> data(k).toString),
1.0,
List(columns)
)
Source.single(datapoint)
}
}
}
}
Source
.single(subMessage)
.concat(exprSource)
.map(msg => ByteString(Json.encode(msg)))
}