def behavior()

in cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionist.scala [272:611]


  def behavior(setup: Setup, state: State): Behavior[Command] =
    Behaviors.setup { ctx =>
      import setup._

      def isLeader = {
        cluster.state.leader.contains(cluster.selfAddress)
      }

      def nodesRemoved(addresses: Set[UniqueAddress], onlyRemoveOldEntries: Boolean): Unit = {
        // ok to update from several nodes but more efficient to try to do it from one node
        def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))

        val now = System.currentTimeMillis()

        // it possible that an entry is added before MemberJoined is visible and such entries should not be removed
        def isOld(entry: Entry): Boolean = (now - entry.createdTimestamp) >= settings.pruneRemovedOlderThan.toMillis

        val removals = {
          state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
            case (acc, (key, entries)) =>
              val removedEntries =
                entries.filter(entry => isOnRemovedNode(entry) && (!onlyRemoveOldEntries || isOld(entry)))

              if (removedEntries.isEmpty) acc // no change
              else acc + (key -> removedEntries)
          }
        }

        if (removals.nonEmpty) {
          if (ctx.log.isDebugEnabled)
            ctx.log.debugN(
              "ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
              cluster.selfAddress,
              addresses.mkString(","),
              removals
                .map {
                  case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
                }
                .mkString(","))

          // shard changes over the ddata keys they belong to
          val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)

          removalsPerDdataKey.foreach {
            case (ddataKey, removalForKey) =>
              replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
                ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
              }
          }

        }
      }

      def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
        notifySubscribers(keysForNode, servicesWereAddedOrRemoved = false, newState)
      }

      def notifySubscribers(
          changedKeys: Set[AbstractServiceKey],
          servicesWereAddedOrRemoved: Boolean,
          newState: State): Unit = {
        changedKeys.foreach { changedKey =>
          val serviceKey = changedKey.asServiceKey

          val subscribers = newState.subscriptions.get(changedKey)
          if (subscribers.nonEmpty) {
            val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
            val listing =
              ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved)
            subscribers.foreach(_ ! listing)
          }
        }
      }

      def onCommand(cmd: Command): Behavior[Command] = cmd match {
        case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
          if (serviceInstance.path.address.hasLocalScope) {
            val entry = Entry(serviceInstance, setup.selfSystemUid)(System.currentTimeMillis())
            ctx.log
              .debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
            // actor already watched after one service key registration
            if (!state.servicesPerActor.contains(serviceInstance))
              ctx.watchWith(serviceInstance, LocalServiceActorTerminated(serviceInstance))

            maybeReplyTo match {
              case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
              case None          =>
            }
            val ddataKey = state.registry.ddataKeyFor(key)
            replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
              ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
            }
            behavior(setup, state.addLocalService(serviceInstance, key))
          } else {
            ctx.log.error("ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance)
            Behaviors.same
          }

        case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
          if (serviceInstance.path.address.hasLocalScope) {
            val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
            ctx.log.debugN(
              "ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
              cluster.selfAddress,
              key.asServiceKey.id,
              entry)
            val newState = state.removeLocalService(serviceInstance, key, setup.newTombstoneDeadline())
            if (!newState.servicesPerActor.contains(serviceInstance)) {
              // last service for actor unregistered, stop watching
              ctx.unwatch(serviceInstance)
            }
            maybeReplyTo match {
              case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
              case None          =>
            }
            val ddataKey = state.registry.ddataKeyFor(key)
            replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
              ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
            }
            // tombstone removals so they are not re-added by merging with other concurrent
            // registrations for the same key
            behavior(setup, newState)
          } else {
            ctx.log.error("ClusterReceptionist [{}] - Unregistering non-local [{}] is not supported", serviceInstance)
            Behaviors.same
          }

        case ReceptionistMessages.Find(key, replyTo) =>
          val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
          replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
          Behaviors.same

        case ReceptionistMessages.Subscribe(key, subscriber) =>
          if (subscriber.path.address.hasLocalScope) {
            ctx.watchWith(subscriber, SubscriberTerminated(subscriber))

            // immediately reply with initial listings to the new subscriber
            val listing = {
              val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
              ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
            }
            subscriber ! listing

            behavior(setup, state.copy(subscriptions = state.subscriptions.inserted(key)(subscriber)))
          } else {
            ctx.log.error("ClusterReceptionist [{}] - Subscriptions from non-local [{}] is not supported", subscriber)
            Behaviors.same
          }

        case _ =>
          throw new IllegalArgumentException() // to please exhaustiveness check, compiler does not know about internal/public command

      }

      def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match {

        case SubscriberTerminated(subscriber) =>
          behavior(setup, state.removeSubscriber(subscriber))

        case LocalServiceActorTerminated(serviceInstance) =>
          val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)

          // could be empty if there was a race between termination and unregistration
          val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)

          ctx.log.debugN(
            "ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]",
            cluster.selfAddress,
            keys.map(_.asServiceKey.id).mkString(", "),
            entry)

          keys.foreach { key =>
            val ddataKey = state.registry.ddataKeyFor(key.asServiceKey)
            replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
              ServiceRegistry(registry).removeBinding(key.asServiceKey, entry).toORMultiMap
            }
          }
          // tombstone removals so they are not re-added by merging with other concurrent
          // registrations for the same key
          behavior(setup, state.addTombstone(serviceInstance, setup.newTombstoneDeadline()))

        case ChangeFromReplicator(ddataKey, value) =>
          // every change will come back this way - this is where the local notifications happens
          val newRegistry = ServiceRegistry(value)
          val changedKeys = state.registry.collectChangedKeys(ddataKey, newRegistry)
          val newState = state.copy(registry = state.registry.withServiceRegistry(ddataKey, newRegistry))

          if (changedKeys.nonEmpty) {
            if (ctx.log.isDebugEnabled) {
              ctx.log.debugN(
                "ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]",
                cluster.selfAddress,
                newRegistry.entries.entries,
                changedKeys
                  .map(key => key.asServiceKey.id -> newRegistry.entriesFor(key).mkString("[", ", ", "]"))
                  .mkString(", "),
                state.tombstones.mkString(", "))
            }

            notifySubscribers(changedKeys, servicesWereAddedOrRemoved = true, newState)

            changedKeys.foreach { changedKey =>
              val serviceKey = changedKey.asServiceKey

              // because of how ORMultiMap/ORset works, we could have a case where an actor we removed
              // is re-introduced because of a concurrent update, in that case we need to re-remove it
              val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
              if (tombstonedButReAdded.nonEmpty) {
                if (ctx.log.isDebugEnabled)
                  ctx.log.debug2(
                    "ClusterReceptionist [{}] - Saw ActorRefs that were tomstoned [{}], re-removing.",
                    cluster.selfAddress,
                    tombstonedButReAdded.mkString(", "))

                replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
                  tombstonedButReAdded
                    .foldLeft(ServiceRegistry(registry)) { (acc, ref) =>
                      acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid)(0L))
                    }
                    .toORMultiMap
                }
              }
            }

            behavior(setup, newState)
          } else {
            Behaviors.same
          }

        case NodeAdded(uniqueAddress) =>
          if (state.registry.nodes.contains(uniqueAddress)) {
            Behaviors.same
          } else {
            val newState = state.copy(registry = state.registry.addNode(uniqueAddress))
            val keysForNode = newState.registry.keysFor(uniqueAddress)
            if (keysForNode.nonEmpty) {
              ctx.log.debug2(
                "ClusterReceptionist [{}] - Node with registered services added [{}]",
                cluster.selfAddress,
                uniqueAddress)
              notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
            } else {
              ctx.log.debug2("ClusterReceptionist [{}] - Node added [{}]", cluster.selfAddress, uniqueAddress)
            }

            behavior(setup, newState)
          }

        case NodeRemoved(uniqueAddress) =>
          if (uniqueAddress == selfUniqueAddress) {
            ctx.log.debug("ClusterReceptionist [{}] - terminated/removed", cluster.selfAddress)
            // If self cluster node is shutting down our own entries should have been removed via
            // watch-Terminated or will be removed by other nodes. This point is anyway too late.
            Behaviors.stopped
          } else if (state.registry.nodes.contains(uniqueAddress)) {

            val keysForNode = state.registry.keysFor(uniqueAddress)
            val newState = state.copy(registry = state.registry.removeNode(uniqueAddress))
            if (keysForNode.nonEmpty) {
              ctx.log.debug2(
                "ClusterReceptionist [{}] - Node with registered services removed [{}]",
                cluster.selfAddress,
                uniqueAddress)
              notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
            }

            // Ok to update from several nodes but more efficient to try to do it from one node.
            if (isLeader) {
              ctx.log.debug2(
                "ClusterReceptionist [{}] - Leader node observed removed node [{}]",
                cluster.selfAddress,
                uniqueAddress)
              nodesRemoved(Set(uniqueAddress), onlyRemoveOldEntries = false)
            }

            behavior(setup, newState)
          } else {
            Behaviors.same
          }

        case NodeUnreachable(uniqueAddress) =>
          val keysForNode = state.registry.keysFor(uniqueAddress)
          val newState = state.copy(registry = state.registry.addUnreachable(uniqueAddress))
          if (keysForNode.nonEmpty) {
            ctx.log.debug2(
              "ClusterReceptionist [{}] - Node with registered services unreachable [{}]",
              cluster.selfAddress,
              uniqueAddress)
            reachabilityChanged(keysForNode, newState)
          }
          behavior(setup, newState)

        case NodeReachable(uniqueAddress) =>
          val keysForNode = state.registry.keysFor(uniqueAddress)
          val newState = state.copy(registry = state.registry.removeUnreachable(uniqueAddress))
          if (keysForNode.nonEmpty) {
            ctx.log.debug2(
              "ClusterReceptionist [{}] - Node with registered services reachable again [{}]",
              cluster.selfAddress,
              uniqueAddress)
            reachabilityChanged(keysForNode, newState)
          }
          behavior(setup, newState)

        case RemoveTick =>
          // ok to update from several nodes but more efficient to try to do it from one node
          if (isLeader) {
            val allAddressesInState: Set[UniqueAddress] =
              state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
            val notInCluster = allAddressesInState.diff(state.registry.nodes)

            if (notInCluster.nonEmpty) {
              if (ctx.log.isDebugEnabled)
                ctx.log.debug2(
                  "ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]",
                  cluster.selfAddress,
                  notInCluster.mkString(","))
              nodesRemoved(notInCluster, onlyRemoveOldEntries = true)
            }
          }
          Behaviors.same

        case PruneTombstonesTick =>
          val prunedState = state.pruneTombstones()
          if (prunedState eq state) Behaviors.same
          else {
            ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress)
            behavior(setup, prunedState)
          }
      }

      Behaviors.receive[Command] { (_, msg) =>
        msg match {
          // support two heterogeneous types of messages without union types
          case cmd: InternalCommand => onInternalCommand(cmd)
          case cmd: Command         => onCommand(cmd)
          case null                 => Behaviors.unhandled
        }
      }
    }