in management-cluster-http/src/main/scala/org/apache/pekko/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala [138:235]
private def routeGetClusterDomainEvents(cluster: Cluster) = {
import pekko.actor.ActorRef
import pekko.cluster.ClusterEvent
import pekko.http.scaladsl.marshalling.sse.EventStreamMarshalling._
import pekko.http.scaladsl.model.sse.ServerSentEvent
import pekko.stream.{ Materializer, OverflowStrategy }
import pekko.stream.scaladsl.Source
import scala.concurrent.{ ExecutionContext, Promise }
val eventClasses: Map[String, Class[_]] = Map(
"ClusterDomainEvent" -> classOf[ClusterEvent.ClusterDomainEvent],
"MemberEvent" -> classOf[ClusterEvent.MemberEvent],
"MemberJoined" -> classOf[ClusterEvent.MemberJoined],
"MemberWeaklyUp" -> classOf[ClusterEvent.MemberWeaklyUp],
"MemberUp" -> classOf[ClusterEvent.MemberUp],
"MemberLeft" -> classOf[ClusterEvent.MemberLeft],
"MemberExited" -> classOf[ClusterEvent.MemberExited],
"MemberDowned" -> classOf[ClusterEvent.MemberDowned],
"MemberRemoved" -> classOf[ClusterEvent.MemberRemoved],
"LeaderChanged" -> classOf[ClusterEvent.LeaderChanged],
"RoleLeaderChanged" -> classOf[ClusterEvent.RoleLeaderChanged],
"ClusterShuttingDown" -> ClusterEvent.ClusterShuttingDown.getClass,
"ReachabilityEvent" -> classOf[ClusterEvent.ReachabilityEvent],
"UnreachableMember" -> classOf[ClusterEvent.UnreachableMember],
"ReachableMember" -> classOf[ClusterEvent.ReachableMember],
"DataCenterReachabilityEvent" -> classOf[ClusterEvent.DataCenterReachabilityEvent],
"UnreachableDataCenter" -> classOf[ClusterEvent.UnreachableDataCenter],
"ReachableDataCenter" -> classOf[ClusterEvent.ReachableDataCenter])
extractMaterializer { implicit mat: Materializer =>
implicit val ec: ExecutionContext = mat.executionContext
get {
parameter("type".as[String].*) { providedEventTypes =>
val classes =
if (providedEventTypes.nonEmpty)
providedEventTypes.foldLeft(List.empty[Class[_]]) {
case (accum, eventType) =>
eventClasses.get(eventType).toList ::: accum
}
else
List(classOf[ClusterEvent.ClusterDomainEvent])
val eventualActorRef = Promise[Option[ActorRef]]()
val clusterEvents = Source
.actorRef[ClusterEvent.ClusterDomainEvent](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = 128,
overflowStrategy = OverflowStrategy.fail)
.map(ClusterDomainEventServerSentEventEncoder.encode)
.collect {
case Some(serverSentEvent) => serverSentEvent
}
.keepAlive(10.seconds, () => ServerSentEvent.heartbeat)
.mapMaterializedValue { actorRef =>
eventualActorRef.success(Some(actorRef))
()
}
.watchTermination() {
case (_, eventualDone) =>
eventualDone.onComplete { _ =>
// the stream has terminated, so complete the promise if it isn't already, and
// then unsubscribe if previously subscribed
val _ = eventualActorRef.trySuccess(None)
eventualActorRef.future.foreach {
case Some(actorRef) =>
if (classes.nonEmpty) {
cluster.unsubscribe(actorRef)
}
case None =>
}
}
}
eventualActorRef.future.foreach {
case Some(actorRef) =>
if (classes.nonEmpty) {
cluster.subscribe(
actorRef,
initialStateMode = ClusterEvent.InitialStateAsEvents,
classes: _*)
}
case None =>
}
complete(clusterEvents)
}
}
}
}