in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala [86:144]
def rewrite(
client: SuperPoolClient,
context: StreamContext,
keepRetrying: Boolean
): Flow[DataSources, DataSources, NotUsed] = {
if (!enabled) {
return Flow[DataSources]
}
val (cachedQueue, cachedSource) = StreamOps
.blockingQueue[DataSources](registry, "cachedRewrites", 1)
.toMat(BroadcastHub.sink(1))(Keep.both)
.run()
var sentCacheData = false
val retryFlow = RetryFlow
.withBackoff(
minBackoff = 100.milliseconds,
maxBackoff = 5.second,
randomFactor = 0.35,
maxRetries = if (keepRetrying) -1 else 0,
flow = httpFlow(client, context)
) {
case (original, resp) =>
resp match {
case Success(_) => None
case Failure(ex) =>
val (request, dsl) = original
logger.debug("Retrying the rewrite request due to error", ex)
if (!sentCacheData) {
if (!cachedQueue.offer(returnFromCache(dsl))) {
// note that this should never happen.
logger.error("Unable to send cached results to queue.")
} else {
sentCacheData = true
}
}
Some(request -> dsl)
}
}
.watchTermination() { (_, f) =>
f.onComplete { _ =>
cachedQueue.complete()
}
}
Flow[DataSources]
.map(_.sources().asScala.toList)
.map(dsl => constructRequest(dsl) -> dsl)
.via(retryFlow)
.filter(_.isSuccess)
.map {
// reset the cached flag
sentCacheData = false
_.get
}
.merge(cachedSource)
}