in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EddaGroupsLookup.scala [50:120]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
private var lookupTickSwitch: Option[SourceRef[NotUsed, NotUsed]] = None
override def onPush(): Unit = {
import scala.jdk.CollectionConverters.*
// If there is an existing source polling Eureka, then tell it to stop. Create a
// new instance of the flag for the next source
lookupTickSwitch.foreach(_.stop())
val next = grab(in)
// Create a list of sources, one for each distinct Eureka group that is needed
// by one of the data sources
if (next.sources.isEmpty) {
// If the Eureka based sources are empty, then just use an empty source to avoid
// potential delays to shutting down when the upstream completes.
lookupTickSwitch = None // No need to stop Source.single
push(out, Source.single[SourcesAndGroups](DataSources.empty() -> Groups(List.empty)))
} else {
val eurekaSources = next.sources.asScala
.flatMap { s =>
try {
Option(context.findEurekaBackendForUri(Uri(s.uri)).eddaUri)
} catch {
case e: Exception =>
val msg = DiagnosticMessage.error(e)
context.dsLogger(s, msg)
None
}
}
.toList
.distinct
.map { uri =>
EddaSource(uri, context)
}
// Perform lookup for each vip and create groups composite
val lookup = Source(eurekaSources)
.flatMapConcat(s => s)
.fold(List.empty[GroupResponse])((acc, g) => g :: acc)
.map(gs => next -> Groups(gs))
// Regularly refresh the metadata until it is stopped
val lookupTickSourceRef = EvaluationFlows.stoppableSource[NotUsed, NotUsed](
EvaluationFlows.repeat(NotUsed, frequency)
)
lookupTickSwitch = Option(lookupTickSourceRef)
push(out, lookupTickSourceRef.source.flatMapConcat(_ => lookup))
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
lookupTickSwitch.foreach(_.stop())
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
lookupTickSwitch.foreach(_.stop())
}
setHandlers(in, out, this)
}
}