override def createLogic()

in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EddaGroupsLookup.scala [50:120]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler with OutHandler {

      private var lookupTickSwitch: Option[SourceRef[NotUsed, NotUsed]] = None

      override def onPush(): Unit = {
        import scala.jdk.CollectionConverters.*

        // If there is an existing source polling Eureka, then tell it to stop. Create a
        // new instance of the flag for the next source
        lookupTickSwitch.foreach(_.stop())

        val next = grab(in)

        // Create a list of sources, one for each distinct Eureka group that is needed
        // by one of the data sources
        if (next.sources.isEmpty) {
          // If the Eureka based sources are empty, then just use an empty source to avoid
          // potential delays to shutting down when the upstream completes.
          lookupTickSwitch = None // No need to stop Source.single
          push(out, Source.single[SourcesAndGroups](DataSources.empty() -> Groups(List.empty)))
        } else {
          val eurekaSources = next.sources.asScala
            .flatMap { s =>
              try {
                Option(context.findEurekaBackendForUri(Uri(s.uri)).eddaUri)
              } catch {
                case e: Exception =>
                  val msg = DiagnosticMessage.error(e)
                  context.dsLogger(s, msg)
                  None
              }
            }
            .toList
            .distinct
            .map { uri =>
              EddaSource(uri, context)
            }

          // Perform lookup for each vip and create groups composite
          val lookup = Source(eurekaSources)
            .flatMapConcat(s => s)
            .fold(List.empty[GroupResponse])((acc, g) => g :: acc)
            .map(gs => next -> Groups(gs))

          // Regularly refresh the metadata until it is stopped
          val lookupTickSourceRef = EvaluationFlows.stoppableSource[NotUsed, NotUsed](
            EvaluationFlows.repeat(NotUsed, frequency)
          )
          lookupTickSwitch = Option(lookupTickSourceRef)
          push(out, lookupTickSourceRef.source.flatMapConcat(_ => lookup))
        }
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
        lookupTickSwitch.foreach(_.stop())
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        super.onUpstreamFailure(ex)
        lookupTickSwitch.foreach(_.stop())
      }

      setHandlers(in, out, this)
    }
  }