in cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala [623:807]
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
def receive = {
case Send(path, msg, localAffinity) =>
val routees = registry(selfAddress).content.get(path) match {
case Some(valueHolder) if localAffinity =>
valueHolder.routee.toList.toIndexedSeq
case _ =>
(for {
(_, bucket) <- registry
valueHolder <- bucket.content.get(path).toList
routee <- valueHolder.routee.toList
} yield routee).toVector
}
if (routees.isEmpty) ignoreOrSendToDeadLetters(msg)
else Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())
case SendToAll(path, msg, skipSenderNode) =>
publish(path, msg, skipSenderNode)
case Publish(topic, msg, sendOneMessageToEachGroup) =>
if (sendOneMessageToEachGroup)
publishToEachGroup(mkKey(self.path / encName(topic)), msg)
else
publish(mkKey(self.path / encName(topic)), msg)
case Put(ref: ActorRef) =>
if (ref.path.address.hasGlobalScope)
log.warning("Registered actor must be local: [{}]", ref)
else {
put(mkKey(ref), Some(ref))
context.watch(ref)
}
case Remove(key) =>
registry(selfAddress).content.get(key) match {
case Some(ValueHolder(_, Some(ref))) =>
context.unwatch(ref)
put(key, None)
case _ =>
}
case msg @ Subscribe(topic, _, _) =>
// each topic is managed by a child actor with the same name as the topic
val encTopic = encName(topic)
bufferOr(mkKey(self.path / encTopic), msg, sender()) {
context.child(encTopic) match {
case Some(t) => t.forward(msg)
case None => newTopicActor(encTopic).forward(msg)
}
}
case RegisterTopic(t) =>
registerTopic(t)
case NoMoreSubscribers =>
val key = mkKey(sender())
initializeGrouping(key)
sender() ! TerminateRequest
case NewSubscriberArrived =>
val key = mkKey(sender())
forwardMessages(key, sender())
case GetTopics =>
sender() ! CurrentTopics(getCurrentTopics())
case Subscribed(ack, ref) =>
ref ! ack
case msg @ Unsubscribe(topic, _, _) =>
val encTopic = encName(topic)
bufferOr(mkKey(self.path / encTopic), msg, sender()) {
context.child(encTopic) match {
case Some(t) => t.forward(msg)
case None => // no such topic here
}
}
case Unsubscribed(ack, ref) =>
ref ! ack
case Status(otherVersions, isReplyToStatus) =>
// only accept status from known nodes, otherwise old cluster with same address may interact
// also accept from local for testing purposes
if (nodes(sender().path.address) || sender().path.address.hasLocalScope) {
// gossip chat starts with a Status message, containing the bucket versions of the other node
val delta = collectDelta(otherVersions)
if (delta.nonEmpty)
sender() ! Delta(delta)
if (!isReplyToStatus && otherHasNewerVersions(otherVersions))
sender() ! Status(versions = myVersions, isReplyToStatus = true) // it will reply with Delta
}
case Delta(buckets) =>
deltaCount += 1
// reply from Status message in the gossip chat
// the Delta contains potential updates (newer versions) from the other node
// only accept deltas/buckets from known nodes, otherwise there is a risk of
// adding back entries when nodes are removed
if (nodes(sender().path.address)) {
buckets.foreach { b =>
if (nodes(b.owner)) {
val myBucket = registry(b.owner)
if (b.version > myBucket.version) {
registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
}
}
}
}
case GossipTick => gossip()
case Prune => prune()
case Terminated(a) =>
val key = mkKey(a)
registry(selfAddress).content.get(key) match {
case Some(ValueHolder(_, Some(`a`))) =>
// remove
put(key, None)
case _ =>
}
recreateAndForwardMessagesIfNeeded(key, newTopicActor(a.path.name))
case state: CurrentClusterState =>
nodes = state.members.collect {
case m if m.status != MemberStatus.Joining && matchingRole(m) => m.address
}
case MemberUp(m) =>
if (matchingRole(m))
nodes += m.address
case MemberWeaklyUp(m) =>
if (matchingRole(m))
nodes += m.address
case MemberLeft(m) =>
if (matchingRole(m)) {
nodes -= m.address
registry -= m.address
}
case MemberDowned(m) =>
if (matchingRole(m)) {
nodes -= m.address
registry -= m.address
}
case MemberRemoved(m, _) =>
if (m.address == selfAddress)
context.stop(self)
else if (matchingRole(m)) {
nodes -= m.address
registry -= m.address
}
case _: MemberEvent => // not of interest
case Count =>
val count = registry.map {
case (_, bucket) =>
bucket.content.count {
case (_, valueHolder) => valueHolder.ref.isDefined
}
}.sum
sender() ! count
case DeltaCount =>
sender() ! deltaCount
case msg @ CountSubscribers(topic) =>
val encTopic = encName(topic)
bufferOr(mkKey(self.path / encTopic), msg, sender()) {
context.child(encTopic) match {
case Some(ref) => ref.tell(Count, sender())
case None => sender() ! 0
}
}
}