def props()

in cluster-tools/src/main/scala/org/apache/pekko/cluster/singleton/ClusterSingletonManager.scala [176:432]


  def props(singletonProps: Props, terminationMessage: Any, settings: ClusterSingletonManagerSettings): Props =
    Props(new ClusterSingletonManager(singletonProps, terminationMessage, settings))
      .withDispatcher(Dispatchers.InternalDispatcherId)
      .withDeploy(Deploy.local)

  /**
   * INTERNAL API
   * public due to the `with FSM` type parameters
   */
  sealed trait State

  /**
   * INTERNAL API
   * public due to the `with FSM` type parameters
   */
  sealed trait Data

  /**
   * INTERNAL API
   */
  private[pekko] object Internal {

    /**
     * Sent from new oldest to previous oldest to initiate the
     * hand-over process. `HandOverInProgress` and `HandOverDone`
     * are expected replies.
     */
    case object HandOverToMe extends ClusterSingletonMessage with DeadLetterSuppression

    /**
     * Confirmation by the previous oldest that the hand
     * over process, shut down of the singleton actor, has
     * started.
     */
    case object HandOverInProgress extends ClusterSingletonMessage

    /**
     * Confirmation by the previous oldest that the singleton
     * actor has been terminated and the hand-over process is
     * completed.
     */
    case object HandOverDone extends ClusterSingletonMessage

    /**
     * Sent from from previous oldest to new oldest to
     * initiate the normal hand-over process.
     * Especially useful when new node joins and becomes
     * oldest immediately, without knowing who was previous
     * oldest.
     */
    case object TakeOverFromMe extends ClusterSingletonMessage with DeadLetterSuppression

    final case class HandOverRetry(count: Int)
    final case class TakeOverRetry(count: Int)
    case object LeaseRetry
    case object Cleanup
    case object StartOldestChangedBuffer

    case object Start extends State
    case object AcquiringLease extends State
    case object Oldest extends State
    case object Younger extends State
    case object BecomingOldest extends State
    case object WasOldest extends State
    case object HandingOver extends State
    case object TakeOver extends State
    case object Stopping extends State
    case object End extends State

    case object Uninitialized extends Data
    final case class YoungerData(oldest: List[UniqueAddress]) extends Data
    final case class BecomingOldestData(previousOldest: List[UniqueAddress]) extends Data
    final case class OldestData(singleton: Option[ActorRef]) extends Data
    final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data
    final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
    final case class StoppingData(singleton: ActorRef) extends Data
    case object EndData extends Data
    final case class DelayedMemberRemoved(member: Member)
    case object SelfExiting
    case class AcquiringLeaseData(leaseRequestInProgress: Boolean, singleton: Option[ActorRef]) extends Data

    val HandOverRetryTimer = "hand-over-retry"
    val TakeOverRetryTimer = "take-over-retry"
    val CleanupTimer = "cleanup"
    val LeaseRetryTimer = "lease-retry"

    object OldestChangedBuffer {

      /**
       * Request to deliver one more event.
       */
      case object GetNext

      /**
       * The first event, corresponding to CurrentClusterState.
       */
      final case class InitialOldestState(oldest: List[UniqueAddress], safeToBeOldest: Boolean)

      final case class OldestChanged(oldest: Option[UniqueAddress])
    }

    final case class AcquireLeaseResult(holdingLease: Boolean) extends DeadLetterSuppression
    final case class ReleaseLeaseResult(released: Boolean) extends DeadLetterSuppression
    final case class AcquireLeaseFailure(t: Throwable) extends DeadLetterSuppression
    final case class ReleaseLeaseFailure(t: Throwable) extends DeadLetterSuppression
    final case class LeaseLost(reason: Option[Throwable]) extends DeadLetterSuppression

    /**
     * Notifications of member events that track oldest member are tunneled
     * via this actor (child of ClusterSingletonManager) to be able to deliver
     * one change at a time. Avoiding simultaneous changes simplifies
     * the process in ClusterSingletonManager. ClusterSingletonManager requests
     * next event with `GetNext` when it is ready for it. Only one outstanding
     * `GetNext` request is allowed. Incoming events are buffered and delivered
     * upon `GetNext` request.
     */
    class OldestChangedBuffer(role: Option[String]) extends Actor {
      import OldestChangedBuffer._

      val cluster = Cluster(context.system)
      // sort by age, oldest first
      val ageOrdering = Member.ageOrdering
      var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)

      var changes = Vector.empty[AnyRef]

      // subscribe to MemberEvent, re-subscribe when restart
      override def preStart(): Unit = {
        cluster.subscribe(self, classOf[MemberEvent])

        // It's a delicate difference between CoordinatedShutdown.PhaseClusterExiting and MemberExited.
        // MemberExited event is published immediately (leader may have performed that transition on other node),
        // and that will trigger run of CoordinatedShutdown, while PhaseClusterExiting will happen later.
        // Using PhaseClusterExiting in the singleton because the graceful shutdown of sharding region
        // should preferably complete before stopping the singleton sharding coordinator on same node.
        val coordShutdown = CoordinatedShutdown(context.system)
        coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () =>
          if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
            Future.successful(Done)
          } else {
            implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
            self.ask(SelfExiting).mapTo[Done]
          }
        }
      }
      override def postStop(): Unit = cluster.unsubscribe(self)

      private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter

      def matchingRole(member: Member): Boolean =
        member.hasRole(selfDc) && role.forall(member.hasRole)

      def trackChange(block: () => Unit): Unit = {
        val before = membersByAge.headOption
        block()
        val after = membersByAge.headOption
        if (before != after)
          changes :+= OldestChanged(after.map(_.uniqueAddress))
      }

      def handleInitial(state: CurrentClusterState): Unit = {
        // all members except Joining and WeaklyUp
        membersByAge = immutable.SortedSet
          .empty(ageOrdering)
          .union(state.members.filter(m => m.upNumber != Int.MaxValue && matchingRole(m)))

        // If there is some removal in progress of an older node it's not safe to immediately become oldest,
        // removal of younger nodes doesn't matter. Note that it can also be started via restart after
        // ClusterSingletonManagerIsStuck.
        val selfUpNumber = state.members
          .collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress => m.upNumber }
          .getOrElse(Int.MaxValue)
        val oldest = membersByAge.takeWhile(_.upNumber <= selfUpNumber)
        val safeToBeOldest = !oldest.exists { m =>
          m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving
        }

        val initial = InitialOldestState(oldest.toList.map(_.uniqueAddress), safeToBeOldest)
        changes :+= initial
      }

      def add(m: Member): Unit = {
        if (matchingRole(m))
          trackChange { () =>
            // replace, it's possible that the upNumber is changed
            membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
            membersByAge += m
          }
      }

      def remove(m: Member): Unit = {
        if (matchingRole(m))
          trackChange { () =>
            membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
          }
      }

      def sendFirstChange(): Unit = {
        // don't send cluster change events if this node is shutting its self down, just wait for SelfExiting
        if (!cluster.isTerminated) {
          val event = changes.head
          changes = changes.tail
          context.parent ! event
        }
      }

      def receive = {
        case state: CurrentClusterState => handleInitial(state)
        case MemberUp(m)                => add(m)
        case MemberRemoved(m, _)        => remove(m)
        case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress =>
          remove(m)
        case SelfExiting =>
          remove(cluster.readView.self)
          sender() ! Done // reply to ask
        case GetNext if changes.isEmpty =>
          context.become(deliverNext, discardOld = false)
        case GetNext =>
          sendFirstChange()
      }

      // the buffer was empty when GetNext was received, deliver next event immediately
      def deliverNext: Actor.Receive = {
        case state: CurrentClusterState =>
          handleInitial(state)
          sendFirstChange()
          context.unbecome()
        case MemberUp(m) =>
          add(m)
          deliverChanges()
        case MemberRemoved(m, _) =>
          remove(m)
          deliverChanges()
        case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress =>
          remove(m)
          deliverChanges()
        case SelfExiting =>
          remove(cluster.readView.self)
          deliverChanges()
          sender() ! Done // reply to ask
      }

      def deliverChanges(): Unit = {
        if (changes.nonEmpty) {
          sendFirstChange()
          context.unbecome()
        }
      }

      override def unhandled(msg: Any): Unit = {
        msg match {
          case _: MemberEvent => // ok, silence
          case _              => super.unhandled(msg)
        }
      }
    }
  }