private def pubSubMediator: ActorRef = DistributedPubSub()

in pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionist.scala [58:108]


  private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator

  /**
   * Register an actor that should be reachable for the clients.
   * The clients can send messages to this actor with `Send` or `SendToAll` using
   * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
   */
  def registerService(actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Put(actor)

  /**
   * A registered actor will be automatically unregistered when terminated,
   * but it can also be explicitly unregistered before termination.
   */
  def unregisterService(actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Remove(actor.path.toStringWithoutAddress)

  /**
   * Register an actor that should be reachable for the clients to a named topic.
   * Several actors can be registered to the same topic name, and all will receive
   * published messages.
   * The client can publish messages to this topic with `Publish`.
   */
  def registerSubscriber(topic: String, actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Subscribe(topic, actor)

  /**
   * A registered subscriber will be automatically unregistered when terminated,
   * but it can also be explicitly unregistered before termination.
   */
  def unregisterSubscriber(topic: String, actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Unsubscribe(topic, actor)

  private val server: Future[Http.ServerBinding] = {
    log.info("Starting ClusterClientReceptionist gRPC server at {}", settings.hostPort)

    implicit val sys: ActorSystem = system
    implicit val materializer: Materializer = Materializer(sys)

    val serialization = new ClusterClientSerialization(system)

    val service: HttpRequest => Future[HttpResponse] =
      ClusterClientReceptionistServiceHandler(
        new ClusterClientReceptionistGrpcImpl(settings, pubSubMediator, serialization)(materializer, log))

    Http().bindAndHandleAsync(
      service,
      interface = settings.hostPort.hostname,
      settings.hostPort.port,
      connectionContext = HttpConnectionContext())
  }