in cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionist.scala [272:611]
def behavior(setup: Setup, state: State): Behavior[Command] =
Behaviors.setup { ctx =>
import setup._
def isLeader = {
cluster.state.leader.contains(cluster.selfAddress)
}
def nodesRemoved(addresses: Set[UniqueAddress], onlyRemoveOldEntries: Boolean): Unit = {
// ok to update from several nodes but more efficient to try to do it from one node
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
val now = System.currentTimeMillis()
// it possible that an entry is added before MemberJoined is visible and such entries should not be removed
def isOld(entry: Entry): Boolean = (now - entry.createdTimestamp) >= settings.pruneRemovedOlderThan.toMillis
val removals = {
state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries)) =>
val removedEntries =
entries.filter(entry => isOnRemovedNode(entry) && (!onlyRemoveOldEntries || isOld(entry)))
if (removedEntries.isEmpty) acc // no change
else acc + (key -> removedEntries)
}
}
if (removals.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debugN(
"ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
cluster.selfAddress,
addresses.mkString(","),
removals
.map {
case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}
.mkString(","))
// shard changes over the ddata keys they belong to
val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
removalsPerDdataKey.foreach {
case (ddataKey, removalForKey) =>
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
}
}
}
}
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = false, newState)
}
def notifySubscribers(
changedKeys: Set[AbstractServiceKey],
servicesWereAddedOrRemoved: Boolean,
newState: State): Unit = {
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
val subscribers = newState.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved)
subscribers.foreach(_ ! listing)
}
}
}
def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
val entry = Entry(serviceInstance, setup.selfSystemUid)(System.currentTimeMillis())
ctx.log
.debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
// actor already watched after one service key registration
if (!state.servicesPerActor.contains(serviceInstance))
ctx.watchWith(serviceInstance, LocalServiceActorTerminated(serviceInstance))
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None =>
}
val ddataKey = state.registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
}
behavior(setup, state.addLocalService(serviceInstance, key))
} else {
ctx.log.error("ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance)
Behaviors.same
}
case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
ctx.log.debugN(
"ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
cluster.selfAddress,
key.asServiceKey.id,
entry)
val newState = state.removeLocalService(serviceInstance, key, setup.newTombstoneDeadline())
if (!newState.servicesPerActor.contains(serviceInstance)) {
// last service for actor unregistered, stop watching
ctx.unwatch(serviceInstance)
}
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
case None =>
}
val ddataKey = state.registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
}
// tombstone removals so they are not re-added by merging with other concurrent
// registrations for the same key
behavior(setup, newState)
} else {
ctx.log.error("ClusterReceptionist [{}] - Unregistering non-local [{}] is not supported", serviceInstance)
Behaviors.same
}
case ReceptionistMessages.Find(key, replyTo) =>
val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) =>
if (subscriber.path.address.hasLocalScope) {
ctx.watchWith(subscriber, SubscriberTerminated(subscriber))
// immediately reply with initial listings to the new subscriber
val listing = {
val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
}
subscriber ! listing
behavior(setup, state.copy(subscriptions = state.subscriptions.inserted(key)(subscriber)))
} else {
ctx.log.error("ClusterReceptionist [{}] - Subscriptions from non-local [{}] is not supported", subscriber)
Behaviors.same
}
case _ =>
throw new IllegalArgumentException() // to please exhaustiveness check, compiler does not know about internal/public command
}
def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match {
case SubscriberTerminated(subscriber) =>
behavior(setup, state.removeSubscriber(subscriber))
case LocalServiceActorTerminated(serviceInstance) =>
val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
// could be empty if there was a race between termination and unregistration
val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
ctx.log.debugN(
"ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]",
cluster.selfAddress,
keys.map(_.asServiceKey.id).mkString(", "),
entry)
keys.foreach { key =>
val ddataKey = state.registry.ddataKeyFor(key.asServiceKey)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeBinding(key.asServiceKey, entry).toORMultiMap
}
}
// tombstone removals so they are not re-added by merging with other concurrent
// registrations for the same key
behavior(setup, state.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
case ChangeFromReplicator(ddataKey, value) =>
// every change will come back this way - this is where the local notifications happens
val newRegistry = ServiceRegistry(value)
val changedKeys = state.registry.collectChangedKeys(ddataKey, newRegistry)
val newState = state.copy(registry = state.registry.withServiceRegistry(ddataKey, newRegistry))
if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) {
ctx.log.debugN(
"ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]",
cluster.selfAddress,
newRegistry.entries.entries,
changedKeys
.map(key => key.asServiceKey.id -> newRegistry.entriesFor(key).mkString("[", ", ", "]"))
.mkString(", "),
state.tombstones.mkString(", "))
}
notifySubscribers(changedKeys, servicesWereAddedOrRemoved = true, newState)
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
if (tombstonedButReAdded.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug2(
"ClusterReceptionist [{}] - Saw ActorRefs that were tomstoned [{}], re-removing.",
cluster.selfAddress,
tombstonedButReAdded.mkString(", "))
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
tombstonedButReAdded
.foldLeft(ServiceRegistry(registry)) { (acc, ref) =>
acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid)(0L))
}
.toORMultiMap
}
}
}
behavior(setup, newState)
} else {
Behaviors.same
}
case NodeAdded(uniqueAddress) =>
if (state.registry.nodes.contains(uniqueAddress)) {
Behaviors.same
} else {
val newState = state.copy(registry = state.registry.addNode(uniqueAddress))
val keysForNode = newState.registry.keysFor(uniqueAddress)
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services added [{}]",
cluster.selfAddress,
uniqueAddress)
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
} else {
ctx.log.debug2("ClusterReceptionist [{}] - Node added [{}]", cluster.selfAddress, uniqueAddress)
}
behavior(setup, newState)
}
case NodeRemoved(uniqueAddress) =>
if (uniqueAddress == selfUniqueAddress) {
ctx.log.debug("ClusterReceptionist [{}] - terminated/removed", cluster.selfAddress)
// If self cluster node is shutting down our own entries should have been removed via
// watch-Terminated or will be removed by other nodes. This point is anyway too late.
Behaviors.stopped
} else if (state.registry.nodes.contains(uniqueAddress)) {
val keysForNode = state.registry.keysFor(uniqueAddress)
val newState = state.copy(registry = state.registry.removeNode(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services removed [{}]",
cluster.selfAddress,
uniqueAddress)
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
}
// Ok to update from several nodes but more efficient to try to do it from one node.
if (isLeader) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node observed removed node [{}]",
cluster.selfAddress,
uniqueAddress)
nodesRemoved(Set(uniqueAddress), onlyRemoveOldEntries = false)
}
behavior(setup, newState)
} else {
Behaviors.same
}
case NodeUnreachable(uniqueAddress) =>
val keysForNode = state.registry.keysFor(uniqueAddress)
val newState = state.copy(registry = state.registry.addUnreachable(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services unreachable [{}]",
cluster.selfAddress,
uniqueAddress)
reachabilityChanged(keysForNode, newState)
}
behavior(setup, newState)
case NodeReachable(uniqueAddress) =>
val keysForNode = state.registry.keysFor(uniqueAddress)
val newState = state.copy(registry = state.registry.removeUnreachable(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services reachable again [{}]",
cluster.selfAddress,
uniqueAddress)
reachabilityChanged(keysForNode, newState)
}
behavior(setup, newState)
case RemoveTick =>
// ok to update from several nodes but more efficient to try to do it from one node
if (isLeader) {
val allAddressesInState: Set[UniqueAddress] =
state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
val notInCluster = allAddressesInState.diff(state.registry.nodes)
if (notInCluster.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]",
cluster.selfAddress,
notInCluster.mkString(","))
nodesRemoved(notInCluster, onlyRemoveOldEntries = true)
}
}
Behaviors.same
case PruneTombstonesTick =>
val prunedState = state.pruneTombstones()
if (prunedState eq state) Behaviors.same
else {
ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress)
behavior(setup, prunedState)
}
}
Behaviors.receive[Command] { (_, msg) =>
msg match {
// support two heterogeneous types of messages without union types
case cmd: InternalCommand => onInternalCommand(cmd)
case cmd: Command => onCommand(cmd)
case null => Behaviors.unhandled
}
}
}