def matchingRole()

in cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala [623:807]


  def matchingRole(m: Member): Boolean = role.forall(m.hasRole)

  def receive = {

    case Send(path, msg, localAffinity) =>
      val routees = registry(selfAddress).content.get(path) match {
        case Some(valueHolder) if localAffinity =>
          valueHolder.routee.toList.toIndexedSeq
        case _ =>
          (for {
            (_, bucket) <- registry
            valueHolder <- bucket.content.get(path).toList
            routee <- valueHolder.routee.toList
          } yield routee).toVector
      }

      if (routees.isEmpty) ignoreOrSendToDeadLetters(msg)
      else Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())

    case SendToAll(path, msg, skipSenderNode) =>
      publish(path, msg, skipSenderNode)

    case Publish(topic, msg, sendOneMessageToEachGroup) =>
      if (sendOneMessageToEachGroup)
        publishToEachGroup(mkKey(self.path / encName(topic)), msg)
      else
        publish(mkKey(self.path / encName(topic)), msg)

    case Put(ref: ActorRef) =>
      if (ref.path.address.hasGlobalScope)
        log.warning("Registered actor must be local: [{}]", ref)
      else {
        put(mkKey(ref), Some(ref))
        context.watch(ref)
      }

    case Remove(key) =>
      registry(selfAddress).content.get(key) match {
        case Some(ValueHolder(_, Some(ref))) =>
          context.unwatch(ref)
          put(key, None)
        case _ =>
      }

    case msg @ Subscribe(topic, _, _) =>
      // each topic is managed by a child actor with the same name as the topic

      val encTopic = encName(topic)

      bufferOr(mkKey(self.path / encTopic), msg, sender()) {
        context.child(encTopic) match {
          case Some(t) => t.forward(msg)
          case None    => newTopicActor(encTopic).forward(msg)
        }
      }

    case RegisterTopic(t) =>
      registerTopic(t)

    case NoMoreSubscribers =>
      val key = mkKey(sender())
      initializeGrouping(key)
      sender() ! TerminateRequest

    case NewSubscriberArrived =>
      val key = mkKey(sender())
      forwardMessages(key, sender())

    case GetTopics =>
      sender() ! CurrentTopics(getCurrentTopics())

    case Subscribed(ack, ref) =>
      ref ! ack

    case msg @ Unsubscribe(topic, _, _) =>
      val encTopic = encName(topic)
      bufferOr(mkKey(self.path / encTopic), msg, sender()) {
        context.child(encTopic) match {
          case Some(t) => t.forward(msg)
          case None    => // no such topic here
        }
      }

    case Unsubscribed(ack, ref) =>
      ref ! ack

    case Status(otherVersions, isReplyToStatus) =>
      // only accept status from known nodes, otherwise old cluster with same address may interact
      // also accept from local for testing purposes
      if (nodes(sender().path.address) || sender().path.address.hasLocalScope) {
        // gossip chat starts with a Status message, containing the bucket versions of the other node
        val delta = collectDelta(otherVersions)
        if (delta.nonEmpty)
          sender() ! Delta(delta)
        if (!isReplyToStatus && otherHasNewerVersions(otherVersions))
          sender() ! Status(versions = myVersions, isReplyToStatus = true) // it will reply with Delta
      }

    case Delta(buckets) =>
      deltaCount += 1

      // reply from Status message in the gossip chat
      // the Delta contains potential updates (newer versions) from the other node
      // only accept deltas/buckets from known nodes, otherwise there is a risk of
      // adding back entries when nodes are removed
      if (nodes(sender().path.address)) {
        buckets.foreach { b =>
          if (nodes(b.owner)) {
            val myBucket = registry(b.owner)
            if (b.version > myBucket.version) {
              registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
            }
          }
        }
      }

    case GossipTick => gossip()

    case Prune => prune()

    case Terminated(a) =>
      val key = mkKey(a)
      registry(selfAddress).content.get(key) match {
        case Some(ValueHolder(_, Some(`a`))) =>
          // remove
          put(key, None)
        case _ =>
      }
      recreateAndForwardMessagesIfNeeded(key, newTopicActor(a.path.name))

    case state: CurrentClusterState =>
      nodes = state.members.collect {
        case m if m.status != MemberStatus.Joining && matchingRole(m) => m.address
      }

    case MemberUp(m) =>
      if (matchingRole(m))
        nodes += m.address

    case MemberWeaklyUp(m) =>
      if (matchingRole(m))
        nodes += m.address

    case MemberLeft(m) =>
      if (matchingRole(m)) {
        nodes -= m.address
        registry -= m.address
      }

    case MemberDowned(m) =>
      if (matchingRole(m)) {
        nodes -= m.address
        registry -= m.address
      }

    case MemberRemoved(m, _) =>
      if (m.address == selfAddress)
        context.stop(self)
      else if (matchingRole(m)) {
        nodes -= m.address
        registry -= m.address
      }

    case _: MemberEvent => // not of interest
    case Count =>
      val count = registry.map {
        case (_, bucket) =>
          bucket.content.count {
            case (_, valueHolder) => valueHolder.ref.isDefined
          }
      }.sum
      sender() ! count

    case DeltaCount =>
      sender() ! deltaCount

    case msg @ CountSubscribers(topic) =>
      val encTopic = encName(topic)
      bufferOr(mkKey(self.path / encTopic), msg, sender()) {
        context.child(encTopic) match {
          case Some(ref) => ref.tell(Count, sender())
          case None      => sender() ! 0
        }
      }
  }