in atlas-pekko/src/main/scala/com/netflix/atlas/pekko/ClusterOps.scala [100:188]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
private val registry = context.registry
private val membersSources = mutable.HashMap.empty[M, SourceQueue[D]]
override def onPush(): Unit = {
val msg = grab(in)
msg match {
case Cluster(members: Set[M]) => updateMembers(members)
case Data(data: Map[M, D]) => pushData(data)
}
}
private def updateMembers(members: Set[M]): Unit = {
val current = membersSources.keySet.toSet
val removed = current -- members
if (removed.nonEmpty) {
logger.debug(s"members removed: $removed")
}
removed.foreach { m =>
membersSources.remove(m).foreach { queue =>
logger.debug(s"stopping $m")
queue.complete()
}
}
val added = members -- current
if (added.nonEmpty) {
logger.debug(s"members added: $added")
}
val sources = added.toList
.map { m =>
val (queue, source) = StreamOps
.blockingQueue(registry, context.id, context.queueSize)
.via(newSubFlow(m))
.preMaterialize()(materializer)
membersSources += m -> queue
source
}
push(out, sources)
}
private def newSubFlow(m: M): Flow[D, O, NotUsed] = {
import OpportunisticEC.*
RestartFlow
.withBackoff(RestartSettings(100.millis, 1.second, 0.0)) { () =>
context.client(m).watchTermination() { (_, f) =>
f.onComplete {
case Success(_) => logger.trace(s"shutdown stream for $m")
case Failure(t) => logger.warn(s"restarting failed stream for $m", t)
}
}
}
.recoverWithRetries(
-1,
{
// Ignore non-fatal failure that may happen when a member is removed from cluster
case e: Exception =>
logger.debug(s"suppressing failure for: $m", e)
Source.empty[O]
}
)
}
private def pushData(data: Map[M, D]): Unit = {
data.foreachEntry { (m, d) =>
membersSources.get(m).foreach(_.offer(d))
}
if (isAvailable(out)) {
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
membersSources.values.foreach(_.complete())
super.onUpstreamFinish()
}
setHandlers(in, out, this)
}
}