private def routeGetClusterDomainEvents()

in management-cluster-http/src/main/scala/org/apache/pekko/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala [138:235]


  private def routeGetClusterDomainEvents(cluster: Cluster) = {
    import pekko.actor.ActorRef
    import pekko.cluster.ClusterEvent
    import pekko.http.scaladsl.marshalling.sse.EventStreamMarshalling._
    import pekko.http.scaladsl.model.sse.ServerSentEvent
    import pekko.stream.{ Materializer, OverflowStrategy }
    import pekko.stream.scaladsl.Source
    import scala.concurrent.{ ExecutionContext, Promise }

    val eventClasses: Map[String, Class[_]] = Map(
      "ClusterDomainEvent" -> classOf[ClusterEvent.ClusterDomainEvent],
      "MemberEvent" -> classOf[ClusterEvent.MemberEvent],
      "MemberJoined" -> classOf[ClusterEvent.MemberJoined],
      "MemberWeaklyUp" -> classOf[ClusterEvent.MemberWeaklyUp],
      "MemberUp" -> classOf[ClusterEvent.MemberUp],
      "MemberLeft" -> classOf[ClusterEvent.MemberLeft],
      "MemberExited" -> classOf[ClusterEvent.MemberExited],
      "MemberDowned" -> classOf[ClusterEvent.MemberDowned],
      "MemberRemoved" -> classOf[ClusterEvent.MemberRemoved],
      "LeaderChanged" -> classOf[ClusterEvent.LeaderChanged],
      "RoleLeaderChanged" -> classOf[ClusterEvent.RoleLeaderChanged],
      "ClusterShuttingDown" -> ClusterEvent.ClusterShuttingDown.getClass,
      "ReachabilityEvent" -> classOf[ClusterEvent.ReachabilityEvent],
      "UnreachableMember" -> classOf[ClusterEvent.UnreachableMember],
      "ReachableMember" -> classOf[ClusterEvent.ReachableMember],
      "DataCenterReachabilityEvent" -> classOf[ClusterEvent.DataCenterReachabilityEvent],
      "UnreachableDataCenter" -> classOf[ClusterEvent.UnreachableDataCenter],
      "ReachableDataCenter" -> classOf[ClusterEvent.ReachableDataCenter])

    extractMaterializer { implicit mat: Materializer =>
      implicit val ec: ExecutionContext = mat.executionContext

      get {
        parameter("type".as[String].*) { providedEventTypes =>
          val classes =
            if (providedEventTypes.nonEmpty)
              providedEventTypes.foldLeft(List.empty[Class[_]]) {
                case (accum, eventType) =>
                  eventClasses.get(eventType).toList ::: accum
              }
            else
              List(classOf[ClusterEvent.ClusterDomainEvent])

          val eventualActorRef = Promise[Option[ActorRef]]()

          val clusterEvents = Source
            .actorRef[ClusterEvent.ClusterDomainEvent](
              completionMatcher = PartialFunction.empty,
              failureMatcher = PartialFunction.empty,
              bufferSize = 128,
              overflowStrategy = OverflowStrategy.fail)
            .map(ClusterDomainEventServerSentEventEncoder.encode)
            .collect {
              case Some(serverSentEvent) => serverSentEvent
            }
            .keepAlive(10.seconds, () => ServerSentEvent.heartbeat)
            .mapMaterializedValue { actorRef =>
              eventualActorRef.success(Some(actorRef))
              ()
            }
            .watchTermination() {
              case (_, eventualDone) =>
                eventualDone.onComplete { _ =>
                  // the stream has terminated, so complete the promise if it isn't already, and
                  // then unsubscribe if previously subscribed

                  val _ = eventualActorRef.trySuccess(None)

                  eventualActorRef.future.foreach {
                    case Some(actorRef) =>
                      if (classes.nonEmpty) {
                        cluster.unsubscribe(actorRef)
                      }

                    case None =>
                  }
                }
            }

          eventualActorRef.future.foreach {
            case Some(actorRef) =>
              if (classes.nonEmpty) {
                cluster.subscribe(
                  actorRef,
                  initialStateMode = ClusterEvent.InitialStateAsEvents,
                  classes: _*)
              }

            case None =>
          }

          complete(clusterEvents)
        }

      }
    }

  }