in cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala [1059:1191]
def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from
val remoteGossip =
try {
envelope.gossip
} catch {
case NonFatal(t) =>
gossipLogger.logWarning("Invalid Gossip. This should only happen during a rolling upgrade. {}", t.getMessage)
Gossip.empty
}
val localGossip = latestGossip
if (remoteGossip eq Gossip.empty) {
gossipLogger.logDebug("Ignoring received gossip from [{}] to protect against overload", from)
Ignored
} else if (envelope.to != selfUniqueAddress) {
gossipLogger.logInfo(
"Ignoring received gossip intended for someone else, from [{}] to [{}]",
from.address,
envelope.to)
Ignored
} else if (!localGossip.hasMember(from)) {
gossipLogger.logInfo("Ignoring received gossip from unknown [{}]", from)
Ignored
} else if (!localGossip.isReachable(selfUniqueAddress, from)) {
gossipLogger.logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) {
gossipLogger.logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
Ignored
} else {
val comparison = remoteGossip.version.compareTo(localGossip.version)
val (winningGossip, talkback, gossipType) = comparison match {
case VectorClock.Same =>
// same version
val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress)
(remoteGossip.mergeSeen(localGossip), talkback, Same)
case VectorClock.Before =>
// local is newer
(localGossip, true, Older)
case VectorClock.After =>
// remote is newer
val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress)
(remoteGossip, talkback, Newer)
case _ =>
// conflicting versions, merge
// We can see that a removal was done when it is not in one of the gossips has status
// Down or Exiting in the other gossip.
// Perform the same pruning (clear of VectorClock) as the leader did when removing a member.
// Removal of member itself is handled in merge (pickHighestPriority)
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) =>
if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
gossipLogger.logDebug("Pruned conflicting local gossip: {}", m)
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else
g
}
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) =>
if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
gossipLogger.logDebug("Pruned conflicting remote gossip: {}", m)
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else
g
}
(prunedRemoteGossip.merge(prunedLocalGossip), true, Merge)
}
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
membershipState = membershipState.copy(
latestGossip =
if (exitingTasksInProgress) winningGossip
else winningGossip.seen(selfUniqueAddress))
assertLatestGossip()
// for all new nodes we remove them from the failure detector
latestGossip.members.foreach { node =>
if (!localGossip.members(node)) {
failureDetector.remove(node.address)
crossDcFailureDetector.remove(node.address)
}
}
gossipLogger.logDebug("Receiving gossip from [{}]", from)
if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
gossipLogger.logDebug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip,
localGossip,
winningGossip)
}
if (statsEnabled) {
gossipStats = gossipType match {
case Merge => gossipStats.incrementMergeCount()
case Same => gossipStats.incrementSameCount()
case Newer => gossipStats.incrementNewerCount()
case Older => gossipStats.incrementOlderCount()
case Ignored => gossipStats // included in receivedGossipCount
}
}
publishMembershipState()
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (selfStatus == Exiting && !exitingTasksInProgress) {
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
if (coordShutdown.shutdownReason().isEmpty)
logInfo("Exiting, starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
}
if (selfStatus == Down && localGossip.member(selfUniqueAddress).status != Down) {
logWarning("Received gossip where this member has been downed, from [{}]", from.address)
shutdownSelfWhenDown()
}
if (talkback) {
// send back gossip to sender() when sender() had different view, i.e. merge, or sender() had
// older or sender() had newer
gossipTo(from, sender())
}
gossipType
}
}