override def createLogic()

in atlas-pekko/src/main/scala/com/netflix/atlas/pekko/ClusterOps.scala [100:188]


    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
      new GraphStageLogic(shape) with InHandler with OutHandler {

        private val registry = context.registry
        private val membersSources = mutable.HashMap.empty[M, SourceQueue[D]]

        override def onPush(): Unit = {
          val msg = grab(in)
          msg match {
            case Cluster(members: Set[M]) => updateMembers(members)
            case Data(data: Map[M, D])    => pushData(data)
          }
        }

        private def updateMembers(members: Set[M]): Unit = {
          val current = membersSources.keySet.toSet

          val removed = current -- members
          if (removed.nonEmpty) {
            logger.debug(s"members removed: $removed")
          }
          removed.foreach { m =>
            membersSources.remove(m).foreach { queue =>
              logger.debug(s"stopping $m")
              queue.complete()
            }
          }

          val added = members -- current
          if (added.nonEmpty) {
            logger.debug(s"members added: $added")
          }
          val sources = added.toList
            .map { m =>
              val (queue, source) = StreamOps
                .blockingQueue(registry, context.id, context.queueSize)
                .via(newSubFlow(m))
                .preMaterialize()(materializer)

              membersSources += m -> queue
              source
            }

          push(out, sources)
        }

        private def newSubFlow(m: M): Flow[D, O, NotUsed] = {
          import OpportunisticEC.*
          RestartFlow
            .withBackoff(RestartSettings(100.millis, 1.second, 0.0)) { () =>
              context.client(m).watchTermination() { (_, f) =>
                f.onComplete {
                  case Success(_) => logger.trace(s"shutdown stream for $m")
                  case Failure(t) => logger.warn(s"restarting failed stream for $m", t)
                }
              }
            }
            .recoverWithRetries(
              -1,
              {
                // Ignore non-fatal failure that may happen when a member is removed from cluster
                case e: Exception =>
                  logger.debug(s"suppressing failure for: $m", e)
                  Source.empty[O]
              }
            )
        }

        private def pushData(data: Map[M, D]): Unit = {
          data.foreachEntry { (m, d) =>
            membersSources.get(m).foreach(_.offer(d))
          }
          if (isAvailable(out)) {
            pull(in)
          }
        }

        override def onPull(): Unit = {
          pull(in)
        }

        override def onUpstreamFinish(): Unit = {
          membersSources.values.foreach(_.complete())
          super.onUpstreamFinish()
        }

        setHandlers(in, out, this)
      }
    }