in cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala [1342:1461]
def leaderActionsOnConvergence(): Unit = {
val removedUnreachable = for {
node <- membershipState.dcReachability.allUnreachableOrTerminated
m = latestGossip.member(node)
if m.dataCenter == selfDc && removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter { n =>
val member = latestGossip.member(n)
member.dataCenter == selfDc && member.status == Exiting
}
val removedOtherDc =
if (latestGossip.isMultiDc) {
latestGossip.members.filter { m =>
m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)
}
} else
Set.empty[Member]
val changedMembers = {
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
latestGossip.members.collect {
var upNumber = 0
{
case m if m.dataCenter == selfDc && isJoiningToUp(m) && !preparingForShutdown =>
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
// don't move members to up when preparing for shutdown
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = membershipState.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
case m if m.dataCenter == selfDc && m.status == Leaving =>
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m.copy(status = Exiting)
case m if m.dataCenter == selfDc & m.status == PreparingForShutdown =>
// Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown)
m.copy(status = ReadyForShutdown)
}
}
}
val updatedGossip: Gossip =
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty ||
removedOtherDc.nonEmpty) {
// replace changed members
val removed = removedUnreachable
.map(_.uniqueAddress)
.union(removedExitingConfirmed)
.union(removedOtherDc.map(_.uniqueAddress))
val newGossip =
latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
if (coordShutdown.shutdownReason().isEmpty)
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
}
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
changedMembers.foreach { m =>
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, m.status),
"Leader is moving node [{}] to [{}]",
m.address,
m.status)
}
removedUnreachable.foreach { m =>
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
"Leader is removing {} node [{}]",
status,
m.address)
}
removedExitingConfirmed.foreach { n =>
logInfo(
ClusterLogMarker.memberChanged(n, MemberStatus.Removed),
"Leader is removing confirmed Exiting node [{}]",
n.address)
}
removedOtherDc.foreach { m =>
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
"Leader is removing {} node [{}] in DC [{}]",
m.status,
m.address,
m.dataCenter)
}
newGossip
} else
latestGossip
val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
if (pruned ne latestGossip) {
updateLatestGossip(pruned)
publishMembershipState()
gossipExitingMembersToOldest(changedMembers.filter(_.status == Exiting))
}
}