def apply()

in atlas-webapi/src/main/scala/com/netflix/atlas/webapi/FetchRequestSource.scala [71:115]


  def apply(system: ActorRefFactory, graphCfg: GraphConfig): Source[ChunkStreamPart, NotUsed] = {
    import org.apache.pekko.pattern.*
    import scala.concurrent.duration.*

    val dbRef = system.actorSelection("/user/db")
    val chunks = {
      val step = graphCfg.roundedStepSize
      val (fstart, fend) = roundToStep(step, graphCfg.resStart, graphCfg.resEnd)
      EvalContext(fstart.toEpochMilli, fend.toEpochMilli, step)
        .partition(chunkSize * step, ChronoUnit.MILLIS)
    }

    val heartbeatSrc = Source
      .repeat(DiagnosticMessage.info("heartbeat"))
      .throttle(1, 10.seconds, 1, ThrottleMode.Shaping)

    val metadataSrc = createMetadataSource(graphCfg)

    val dataSrc = Source(chunks)
      .flatMapConcat { chunk =>
        val req = DataRequest(graphCfg).copy(context = chunk)
        val future = ask(dbRef, req)(Timeout(30.seconds))
        Source
          .future(future)
          .collect {
            case DataResponse(data) => DataChunk(chunk, data)
          }
      }
      .via(new EvalFlow(graphCfg))
      .flatMapConcat(ts => Source(ts))
      .recover {
        case t: Throwable => DiagnosticMessage.error(t)
      }
      .merge(heartbeatSrc, eagerComplete = true)
      .map(_.toJson)

    val closeSrc = Source.single(DiagnosticMessage.close).map(_.toJson)

    metadataSrc
      .concat(dataSrc.concat(closeSrc))
      .map { msg =>
        val bytes = ByteString(s"$prefix$msg$suffix")
        ChunkStreamPart(bytes)
      }
  }