def rewrite()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala [86:144]


  def rewrite(
    client: SuperPoolClient,
    context: StreamContext,
    keepRetrying: Boolean
  ): Flow[DataSources, DataSources, NotUsed] = {
    if (!enabled) {
      return Flow[DataSources]
    }

    val (cachedQueue, cachedSource) = StreamOps
      .blockingQueue[DataSources](registry, "cachedRewrites", 1)
      .toMat(BroadcastHub.sink(1))(Keep.both)
      .run()
    var sentCacheData = false

    val retryFlow = RetryFlow
      .withBackoff(
        minBackoff = 100.milliseconds,
        maxBackoff = 5.second,
        randomFactor = 0.35,
        maxRetries = if (keepRetrying) -1 else 0,
        flow = httpFlow(client, context)
      ) {
        case (original, resp) =>
          resp match {
            case Success(_) => None
            case Failure(ex) =>
              val (request, dsl) = original
              logger.debug("Retrying the rewrite request due to error", ex)
              if (!sentCacheData) {
                if (!cachedQueue.offer(returnFromCache(dsl))) {
                  // note that this should never happen.
                  logger.error("Unable to send cached results to queue.")
                } else {
                  sentCacheData = true
                }
              }
              Some(request -> dsl)
          }

      }
      .watchTermination() { (_, f) =>
        f.onComplete { _ =>
          cachedQueue.complete()
        }
      }

    Flow[DataSources]
      .map(_.sources().asScala.toList)
      .map(dsl => constructRequest(dsl) -> dsl)
      .via(retryFlow)
      .filter(_.isSuccess)
      .map {
        // reset the cached flag
        sentCacheData = false
        _.get
      }
      .merge(cachedSource)
  }