in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala [41:140]
def nodeKey(address: Address): String = address.host.get + ":" + address.port.get
def apply: Behavior[Command] = apply(1.second, 1.minute)
def apply(measureInterval: FiniteDuration, cleanupInterval: FiniteDuration): Behavior[Command] =
Behaviors.setup { context =>
DistributedData.withReplicatorMessageAdapter[Command, LWWMap[String, Long]] { replicator =>
Behaviors.withTimers[Command] { timers =>
implicit val selfUniqueAddress: SelfUniqueAddress =
DistributedData(context.system).selfUniqueAddress
implicit val cluster = Cluster(context.system)
val node = nodeKey(cluster.selfMember.address)
timers.startTimerWithFixedDelay(Tick, Tick, measureInterval)
timers.startTimerWithFixedDelay(Cleanup, Cleanup, cleanupInterval)
val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
val UsedHeapKey = LWWMapKey[String, Long]("usedHeap")
val MaxHeapKey = LWWMapKey[String, Long]("maxHeap")
replicator.subscribe(UsedHeapKey, InternalSubscribeResponse.apply)
replicator.subscribe(MaxHeapKey, InternalSubscribeResponse.apply)
val memberUpRef = context.messageAdapter(InternalClusterMemberUp.apply)
val memberRemovedRef = context.messageAdapter(InternalClusterMemberRemoved.apply)
cluster.subscriptions ! Subscribe(memberUpRef, classOf[ClusterEvent.MemberUp])
cluster.subscriptions ! Subscribe(memberRemovedRef, classOf[ClusterEvent.MemberRemoved])
var maxHeap = Map.empty[String, Long]
var nodesInCluster = Set.empty[String]
Behaviors.receiveMessage {
case Tick =>
val heap = memoryMBean.getHeapMemoryUsage
val used = heap.getUsed
val max = heap.getMax
replicator.askUpdate(
askReplyTo =>
Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(_ :+ (node -> used)),
InternalUpdateResponse.apply)
replicator.askUpdate(
askReplyTo =>
Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo) { data =>
data.get(node) match {
case Some(`max`) => data // unchanged
case _ => data :+ (node -> max)
}
},
InternalUpdateResponse.apply)
Behaviors.same
case InternalSubscribeResponse(c @ Changed(MaxHeapKey)) =>
maxHeap = c.get(MaxHeapKey).entries
Behaviors.same
case InternalSubscribeResponse(c @ Changed(UsedHeapKey)) =>
val usedHeapPercent = UsedHeap(c.get(UsedHeapKey).entries.collect {
case (key, value) if maxHeap.contains(key) =>
key -> (value.toDouble / maxHeap(key)) * 100.0
})
context.log.debug2("Node {} observed:\n{}", node, usedHeapPercent)
context.system.eventStream ! EventStream.Publish(usedHeapPercent)
Behaviors.same
case InternalSubscribeResponse(_) => Behaviors.same // ok
case InternalUpdateResponse(_: UpdateResponse[_]) => Behaviors.same // ok
case InternalClusterMemberUp(ClusterEvent.MemberUp(m)) =>
nodesInCluster += nodeKey(m.address)
Behaviors.same
case InternalClusterMemberRemoved(ClusterEvent.MemberRemoved(m, _)) =>
nodesInCluster -= nodeKey(m.address)
if (m.address == cluster.selfMember.uniqueAddress.address)
Behaviors.stopped
else
Behaviors.same
case Cleanup =>
def cleanupRemoved(data: LWWMap[String, Long]): LWWMap[String, Long] =
(data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) =>
d.remove(selfUniqueAddress, key)
}
replicator.askUpdate(
askReplyTo => Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(cleanupRemoved),
InternalUpdateResponse.apply)
replicator.askUpdate(
askReplyTo => Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(cleanupRemoved),
InternalUpdateResponse.apply)
Behaviors.same
}
}
}
}