in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala [146:204]
private[stream] def httpFlow(client: SuperPoolClient, context: StreamContext) = {
Flow[(HttpRequest, List[DataSource])]
.via(client)
.flatMapConcat {
case (Success(resp), dsl) =>
unzipIfNeeded(resp)
.map(_.utf8String)
.map { body =>
resp.status match {
case StatusCodes.OK =>
val rewrites = List.newBuilder[DataSource]
Json
.decode[List[Rewrite]](body)
.zip(dsl)
.map {
case (r, ds) =>
if (!r.status.equals("OK")) {
val msg =
DiagnosticMessage.error(s"failed rewrite of ${ds.uri()}: ${r.message}")
context.dsLogger(ds, msg)
} else {
rewriteCache.put(ds.uri(), r.rewrite)
rewrites += new DataSource(ds.id, ds.step(), r.rewrite)
}
}
.toArray
// NOTE: We're assuming that the number of items returned will be the same as the
// number of uris sent to the rewrite service. If they differ, data sources may be
// mapped to IDs and steps incorrectly.
rewriteSuccess.increment()
Success(DataSources.of(rewrites.result().toArray: _*))
case _ =>
logger.error(
"Error from rewrite service. status={}, resp={}",
resp.status,
body
)
registry
.counter(
rewriteFailures.withTags("status", resp.status.toString(), "exception", "NA")
)
.increment()
Failure(
new RuntimeException(
s"Error from rewrite service. status=${resp.status}, resp=$body"
)
)
}
}
case (Failure(ex), _) =>
logger.error("Failure from rewrite service", ex)
registry
.counter(
rewriteFailures.withTags("status", "0", "exception", ex.getClass.getSimpleName)
)
.increment()
Source.single(Failure(ex))
}
}