def leaderActionsOnConvergence()

in cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala [1342:1461]


  def leaderActionsOnConvergence(): Unit = {

    val removedUnreachable = for {
      node <- membershipState.dcReachability.allUnreachableOrTerminated
      m = latestGossip.member(node)
      if m.dataCenter == selfDc && removeUnreachableWithMemberStatus(m.status)
    } yield m

    val removedExitingConfirmed = exitingConfirmed.filter { n =>
      val member = latestGossip.member(n)
      member.dataCenter == selfDc && member.status == Exiting
    }

    val removedOtherDc =
      if (latestGossip.isMultiDc) {
        latestGossip.members.filter { m =>
          m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)
        }
      } else
        Set.empty[Member]

    val changedMembers = {
      val enoughMembers: Boolean = isMinNrOfMembersFulfilled
      def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers

      latestGossip.members.collect {
        var upNumber = 0

        {
          case m if m.dataCenter == selfDc && isJoiningToUp(m) && !preparingForShutdown =>
            // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
            // and minimum number of nodes have joined the cluster
            // don't move members to up when preparing for shutdown
            if (upNumber == 0) {
              // It is alright to use same upNumber as already used by a removed member, since the upNumber
              // is only used for comparing age of current cluster members (Member.isOlderThan)
              val youngest = membershipState.youngestMember
              upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
            } else {
              upNumber += 1
            }
            m.copyUp(upNumber)

          case m if m.dataCenter == selfDc && m.status == Leaving =>
            // Move LEAVING => EXITING (once we have a convergence on LEAVING)
            m.copy(status = Exiting)

          case m if m.dataCenter == selfDc & m.status == PreparingForShutdown =>
            // Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown)
            m.copy(status = ReadyForShutdown)
        }
      }
    }

    val updatedGossip: Gossip =
      if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty ||
        removedOtherDc.nonEmpty) {

        // replace changed members
        val removed = removedUnreachable
          .map(_.uniqueAddress)
          .union(removedExitingConfirmed)
          .union(removedOtherDc.map(_.uniqueAddress))
        val newGossip =
          latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())

        if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
          // Leader is moving itself from Leaving to Exiting.
          // ExitingCompleted will be received via CoordinatedShutdown to continue
          // the leaving process. Meanwhile the gossip state is not marked as seen.
          exitingTasksInProgress = true
          if (coordShutdown.shutdownReason().isEmpty)
            logInfo("Exiting (leader), starting coordinated shutdown")
          selfExiting.trySuccess(Done)
          coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
        }

        exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)

        changedMembers.foreach { m =>
          logInfo(
            ClusterLogMarker.memberChanged(m.uniqueAddress, m.status),
            "Leader is moving node [{}] to [{}]",
            m.address,
            m.status)
        }
        removedUnreachable.foreach { m =>
          val status = if (m.status == Exiting) "exiting" else "unreachable"
          logInfo(
            ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
            "Leader is removing {} node [{}]",
            status,
            m.address)
        }
        removedExitingConfirmed.foreach { n =>
          logInfo(
            ClusterLogMarker.memberChanged(n, MemberStatus.Removed),
            "Leader is removing confirmed Exiting node [{}]",
            n.address)
        }
        removedOtherDc.foreach { m =>
          logInfo(
            ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
            "Leader is removing {} node [{}] in DC [{}]",
            m.status,
            m.address,
            m.dataCenter)
        }

        newGossip
      } else
        latestGossip

    val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
    if (pruned ne latestGossip) {
      updateLatestGossip(pruned)
      publishMembershipState()
      gossipExitingMembersToOldest(changedMembers.filter(_.status == Exiting))
    }
  }