def nodeKey()

in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala [41:140]


  def nodeKey(address: Address): String = address.host.get + ":" + address.port.get

  def apply: Behavior[Command] = apply(1.second, 1.minute)

  def apply(measureInterval: FiniteDuration, cleanupInterval: FiniteDuration): Behavior[Command] =
    Behaviors.setup { context =>
      DistributedData.withReplicatorMessageAdapter[Command, LWWMap[String, Long]] { replicator =>
        Behaviors.withTimers[Command] { timers =>
          implicit val selfUniqueAddress: SelfUniqueAddress =
            DistributedData(context.system).selfUniqueAddress
          implicit val cluster = Cluster(context.system)
          val node = nodeKey(cluster.selfMember.address)

          timers.startTimerWithFixedDelay(Tick, Tick, measureInterval)
          timers.startTimerWithFixedDelay(Cleanup, Cleanup, cleanupInterval)
          val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

          val UsedHeapKey = LWWMapKey[String, Long]("usedHeap")
          val MaxHeapKey = LWWMapKey[String, Long]("maxHeap")

          replicator.subscribe(UsedHeapKey, InternalSubscribeResponse.apply)
          replicator.subscribe(MaxHeapKey, InternalSubscribeResponse.apply)

          val memberUpRef = context.messageAdapter(InternalClusterMemberUp.apply)
          val memberRemovedRef = context.messageAdapter(InternalClusterMemberRemoved.apply)
          cluster.subscriptions ! Subscribe(memberUpRef, classOf[ClusterEvent.MemberUp])
          cluster.subscriptions ! Subscribe(memberRemovedRef, classOf[ClusterEvent.MemberRemoved])

          var maxHeap = Map.empty[String, Long]
          var nodesInCluster = Set.empty[String]

          Behaviors.receiveMessage {
            case Tick =>
              val heap = memoryMBean.getHeapMemoryUsage
              val used = heap.getUsed
              val max = heap.getMax

              replicator.askUpdate(
                askReplyTo =>
                  Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(_ :+ (node -> used)),
                InternalUpdateResponse.apply)

              replicator.askUpdate(
                askReplyTo =>
                  Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo) { data =>
                    data.get(node) match {
                      case Some(`max`) => data // unchanged
                      case _           => data :+ (node -> max)
                    }
                  },
                InternalUpdateResponse.apply)

              Behaviors.same

            case InternalSubscribeResponse(c @ Changed(MaxHeapKey)) =>
              maxHeap = c.get(MaxHeapKey).entries
              Behaviors.same

            case InternalSubscribeResponse(c @ Changed(UsedHeapKey)) =>
              val usedHeapPercent = UsedHeap(c.get(UsedHeapKey).entries.collect {
                case (key, value) if maxHeap.contains(key) =>
                  key -> (value.toDouble / maxHeap(key)) * 100.0
              })
              context.log.debug2("Node {} observed:\n{}", node, usedHeapPercent)
              context.system.eventStream ! EventStream.Publish(usedHeapPercent)
              Behaviors.same

            case InternalSubscribeResponse(_)                 => Behaviors.same // ok
            case InternalUpdateResponse(_: UpdateResponse[_]) => Behaviors.same // ok

            case InternalClusterMemberUp(ClusterEvent.MemberUp(m)) =>
              nodesInCluster += nodeKey(m.address)
              Behaviors.same

            case InternalClusterMemberRemoved(ClusterEvent.MemberRemoved(m, _)) =>
              nodesInCluster -= nodeKey(m.address)
              if (m.address == cluster.selfMember.uniqueAddress.address)
                Behaviors.stopped
              else
                Behaviors.same

            case Cleanup =>
              def cleanupRemoved(data: LWWMap[String, Long]): LWWMap[String, Long] =
                (data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) =>
                  d.remove(selfUniqueAddress, key)
                }

              replicator.askUpdate(
                askReplyTo => Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(cleanupRemoved),
                InternalUpdateResponse.apply)

              replicator.askUpdate(
                askReplyTo => Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(cleanupRemoved),
                InternalUpdateResponse.apply)

              Behaviors.same
          }
        }
      }
    }