private[stream] def httpFlow()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala [146:204]


  private[stream] def httpFlow(client: SuperPoolClient, context: StreamContext) = {
    Flow[(HttpRequest, List[DataSource])]
      .via(client)
      .flatMapConcat {
        case (Success(resp), dsl) =>
          unzipIfNeeded(resp)
            .map(_.utf8String)
            .map { body =>
              resp.status match {
                case StatusCodes.OK =>
                  val rewrites = List.newBuilder[DataSource]
                  Json
                    .decode[List[Rewrite]](body)
                    .zip(dsl)
                    .map {
                      case (r, ds) =>
                        if (!r.status.equals("OK")) {
                          val msg =
                            DiagnosticMessage.error(s"failed rewrite of ${ds.uri()}: ${r.message}")
                          context.dsLogger(ds, msg)
                        } else {
                          rewriteCache.put(ds.uri(), r.rewrite)
                          rewrites += new DataSource(ds.id, ds.step(), r.rewrite)
                        }
                    }
                    .toArray
                  // NOTE: We're assuming that the number of items returned will be the same as the
                  // number of uris sent to the rewrite service. If they differ, data sources may be
                  // mapped to IDs and steps incorrectly.
                  rewriteSuccess.increment()
                  Success(DataSources.of(rewrites.result().toArray: _*))
                case _ =>
                  logger.error(
                    "Error from rewrite service. status={}, resp={}",
                    resp.status,
                    body
                  )
                  registry
                    .counter(
                      rewriteFailures.withTags("status", resp.status.toString(), "exception", "NA")
                    )
                    .increment()
                  Failure(
                    new RuntimeException(
                      s"Error from rewrite service. status=${resp.status}, resp=$body"
                    )
                  )
              }
            }
        case (Failure(ex), _) =>
          logger.error("Failure from rewrite service", ex)
          registry
            .counter(
              rewriteFailures.withTags("status", "0", "exception", ex.getClass.getSimpleName)
            )
            .increment()
          Source.single(Failure(ex))
      }
  }