in atlas-webapi/src/main/scala/com/netflix/atlas/webapi/FetchRequestSource.scala [71:115]
def apply(system: ActorRefFactory, graphCfg: GraphConfig): Source[ChunkStreamPart, NotUsed] = {
import org.apache.pekko.pattern.*
import scala.concurrent.duration.*
val dbRef = system.actorSelection("/user/db")
val chunks = {
val step = graphCfg.roundedStepSize
val (fstart, fend) = roundToStep(step, graphCfg.resStart, graphCfg.resEnd)
EvalContext(fstart.toEpochMilli, fend.toEpochMilli, step)
.partition(chunkSize * step, ChronoUnit.MILLIS)
}
val heartbeatSrc = Source
.repeat(DiagnosticMessage.info("heartbeat"))
.throttle(1, 10.seconds, 1, ThrottleMode.Shaping)
val metadataSrc = createMetadataSource(graphCfg)
val dataSrc = Source(chunks)
.flatMapConcat { chunk =>
val req = DataRequest(graphCfg).copy(context = chunk)
val future = ask(dbRef, req)(Timeout(30.seconds))
Source
.future(future)
.collect {
case DataResponse(data) => DataChunk(chunk, data)
}
}
.via(new EvalFlow(graphCfg))
.flatMapConcat(ts => Source(ts))
.recover {
case t: Throwable => DiagnosticMessage.error(t)
}
.merge(heartbeatSrc, eagerComplete = true)
.map(_.toJson)
val closeSrc = Source.single(DiagnosticMessage.close).map(_.toJson)
metadataSrc
.concat(dataSrc.concat(closeSrc))
.map { msg =>
val bytes = ByteString(s"$prefix$msg$suffix")
ChunkStreamPart(bytes)
}
}