in hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java [326:558]
public String generateRealTimeTrackingMetrics() {
// JVM
double jvmFreeMemoryGB, jvmMaxMemoryGB, jvmTotalMemoryGB;
if (jvmFreeMemoryGauge == null &&
metrics.getGauges().containsKey("variable.jvm.free.memory")) {
jvmFreeMemoryGauge = metrics.getGauges().get("variable.jvm.free.memory");
}
if (jvmMaxMemoryGauge == null &&
metrics.getGauges().containsKey("variable.jvm.max.memory")) {
jvmMaxMemoryGauge = metrics.getGauges().get("variable.jvm.max.memory");
}
if (jvmTotalMemoryGauge == null &&
metrics.getGauges().containsKey("variable.jvm.total.memory")) {
jvmTotalMemoryGauge = metrics.getGauges()
.get("variable.jvm.total.memory");
}
jvmFreeMemoryGB = jvmFreeMemoryGauge == null ? 0 :
Double.parseDouble(jvmFreeMemoryGauge.getValue().toString())
/1024/1024/1024;
jvmMaxMemoryGB = jvmMaxMemoryGauge == null ? 0 :
Double.parseDouble(jvmMaxMemoryGauge.getValue().toString())
/1024/1024/1024;
jvmTotalMemoryGB = jvmTotalMemoryGauge == null ? 0 :
Double.parseDouble(jvmTotalMemoryGauge.getValue().toString())
/1024/1024/1024;
// number of running applications/containers
String numRunningApps, numRunningContainers;
if (numRunningAppsGauge == null &&
metrics.getGauges().containsKey("variable.running.application")) {
numRunningAppsGauge =
metrics.getGauges().get("variable.running.application");
}
if (numRunningContainersGauge == null &&
metrics.getGauges().containsKey("variable.running.container")) {
numRunningContainersGauge =
metrics.getGauges().get("variable.running.container");
}
numRunningApps = numRunningAppsGauge == null ? "0" :
numRunningAppsGauge.getValue().toString();
numRunningContainers = numRunningContainersGauge == null ? "0" :
numRunningContainersGauge.getValue().toString();
// cluster available/allocate resource
double allocatedMemoryGB, allocatedVCoresGB,
availableMemoryGB, availableVCoresGB;
if (allocatedMemoryGauge == null &&
metrics.getGauges()
.containsKey("variable.cluster.allocated.memory")) {
allocatedMemoryGauge = metrics.getGauges()
.get("variable.cluster.allocated.memory");
}
if (allocatedVCoresGauge == null &&
metrics.getGauges()
.containsKey("variable.cluster.allocated.vcores")) {
allocatedVCoresGauge = metrics.getGauges()
.get("variable.cluster.allocated.vcores");
}
if (availableMemoryGauge == null &&
metrics.getGauges()
.containsKey("variable.cluster.available.memory")) {
availableMemoryGauge = metrics.getGauges()
.get("variable.cluster.available.memory");
}
if (availableVCoresGauge == null &&
metrics.getGauges()
.containsKey("variable.cluster.available.vcores")) {
availableVCoresGauge = metrics.getGauges()
.get("variable.cluster.available.vcores");
}
allocatedMemoryGB = allocatedMemoryGauge == null ? 0 :
Double.parseDouble(allocatedMemoryGauge.getValue().toString())/1024;
allocatedVCoresGB = allocatedVCoresGauge == null ? 0 :
Double.parseDouble(allocatedVCoresGauge.getValue().toString());
availableMemoryGB = availableMemoryGauge == null ? 0 :
Double.parseDouble(availableMemoryGauge.getValue().toString())/1024;
availableVCoresGB = availableVCoresGauge == null ? 0 :
Double.parseDouble(availableVCoresGauge.getValue().toString());
// scheduler operation
double allocateTimecost, commitSuccessTimecost, commitFailureTimecost,
handleTimecost;
if (allocateTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.allocate.timecost")) {
allocateTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.allocate.timecost");
}
if (commitSuccessTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.commit.success.timecost")) {
commitSuccessTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.commit.success.timecost");
}
if (commitFailureTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.commit.failure.timecost")) {
commitFailureTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.commit.failure.timecost");
}
if (handleTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.handle.timecost")) {
handleTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.handle.timecost");
}
allocateTimecost = allocateTimecostHistogram == null ? 0.0 :
allocateTimecostHistogram.getSnapshot().getMean()/1000000;
commitSuccessTimecost = commitSuccessTimecostHistogram == null ? 0.0 :
commitSuccessTimecostHistogram.getSnapshot().getMean()/1000000;
commitFailureTimecost = commitFailureTimecostHistogram == null ? 0.0 :
commitFailureTimecostHistogram.getSnapshot().getMean()/1000000;
handleTimecost = handleTimecostHistogram == null ? 0.0 :
handleTimecostHistogram.getSnapshot().getMean()/1000000;
// various handle operation
Map<SchedulerEventType, Double> handleOperTimecostMap =
new HashMap<SchedulerEventType, Double>();
for (SchedulerEventType e : SchedulerEventType.values()) {
String key = "sampler.scheduler.operation.handle." + e + ".timecost";
if (! handleOperTimecostHistogramMap.containsKey(e) &&
metrics.getHistograms().containsKey(key)) {
handleOperTimecostHistogramMap.put(e, metrics.getHistograms().get(key));
}
double timecost = handleOperTimecostHistogramMap.containsKey(e) ?
handleOperTimecostHistogramMap.get(e).getSnapshot().getMean()/1000000
: 0;
handleOperTimecostMap.put(e, timecost);
}
// allocated resource for each queue
Map<String, Double> queueAllocatedMemoryMap = new HashMap<String, Double>();
Map<String, Long> queueAllocatedVCoresMap = new HashMap<String, Long>();
for (String queue : wrapper.getTracker().getQueueSet()) {
// memory
String key = "counter.queue." + queue + ".allocated.memory";
if (! queueAllocatedMemoryCounterMap.containsKey(queue) &&
metrics.getCounters().containsKey(key)) {
queueAllocatedMemoryCounterMap.put(queue,
metrics.getCounters().get(key));
}
double queueAllocatedMemoryGB =
queueAllocatedMemoryCounterMap.containsKey(queue) ?
queueAllocatedMemoryCounterMap.get(queue).getCount()/1024.0
: 0;
queueAllocatedMemoryMap.put(queue, queueAllocatedMemoryGB);
// vCores
key = "counter.queue." + queue + ".allocated.cores";
if (! queueAllocatedVCoresCounterMap.containsKey(queue) &&
metrics.getCounters().containsKey(key)) {
queueAllocatedVCoresCounterMap.put(
queue, metrics.getCounters().get(key));
}
long queueAllocatedVCores =
queueAllocatedVCoresCounterMap.containsKey(queue) ?
queueAllocatedVCoresCounterMap.get(queue).getCount(): 0;
queueAllocatedVCoresMap.put(queue, queueAllocatedVCores);
}
// calculate commit throughput, unit is number/second
if (schedulerCommitSuccessCounter == null && metrics.getCounters()
.containsKey("counter.scheduler.operation.commit.success")) {
schedulerCommitSuccessCounter = metrics.getCounters()
.get("counter.scheduler.operation.commit.success");
}
if (schedulerCommitFailureCounter == null && metrics.getCounters()
.containsKey("counter.scheduler.operation.commit.failure")) {
schedulerCommitFailureCounter = metrics.getCounters()
.get("counter.scheduler.operation.commit.failure");
}
long schedulerCommitSuccessThroughput = 0;
long schedulerCommitFailureThroughput = 0;
if (schedulerCommitSuccessCounter != null
&& schedulerCommitFailureCounter != null) {
long currentTrackingTime = System.currentTimeMillis();
long currentSchedulerCommitSucessCount =
schedulerCommitSuccessCounter.getCount();
long currentSchedulerCommitFailureCount =
schedulerCommitFailureCounter.getCount();
if (lastTrackingTime != null) {
double intervalSeconds =
(double) (currentTrackingTime - lastTrackingTime) / 1000;
schedulerCommitSuccessThroughput = Math.round(
(currentSchedulerCommitSucessCount
- lastSchedulerCommitSuccessCount) / intervalSeconds);
schedulerCommitFailureThroughput = Math.round(
(currentSchedulerCommitFailureCount
- lastSchedulerCommitFailureCount) / intervalSeconds);
}
lastTrackingTime = currentTrackingTime;
lastSchedulerCommitSuccessCount = currentSchedulerCommitSucessCount;
lastSchedulerCommitFailureCount = currentSchedulerCommitFailureCount;
}
// package results
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"time\":" ).append(System.currentTimeMillis())
.append(",\"jvm.free.memory\":").append(jvmFreeMemoryGB)
.append(",\"jvm.max.memory\":").append(jvmMaxMemoryGB)
.append(",\"jvm.total.memory\":").append(jvmTotalMemoryGB)
.append(",\"running.applications\":").append(numRunningApps)
.append(",\"running.containers\":").append(numRunningContainers)
.append(",\"cluster.allocated.memory\":").append(allocatedMemoryGB)
.append(",\"cluster.allocated.vcores\":").append(allocatedVCoresGB)
.append(",\"cluster.available.memory\":").append(availableMemoryGB)
.append(",\"cluster.available.vcores\":").append(availableVCoresGB);
for (String queue : wrapper.getTracker().getQueueSet()) {
sb.append(",\"queue.").append(queue).append(".allocated.memory\":")
.append(queueAllocatedMemoryMap.get(queue));
sb.append(",\"queue.").append(queue).append(".allocated.vcores\":")
.append(queueAllocatedVCoresMap.get(queue));
}
// scheduler allocate & handle
sb.append(",\"scheduler.allocate.timecost\":").append(allocateTimecost);
sb.append(",\"scheduler.commit.success.timecost\":")
.append(commitSuccessTimecost);
sb.append(",\"scheduler.commit.failure.timecost\":")
.append(commitFailureTimecost);
sb.append(",\"scheduler.commit.success.throughput\":")
.append(schedulerCommitSuccessThroughput);
sb.append(",\"scheduler.commit.failure.throughput\":")
.append(schedulerCommitFailureThroughput);
sb.append(",\"scheduler.handle.timecost\":").append(handleTimecost);
for (SchedulerEventType e : SchedulerEventType.values()) {
sb.append(",\"scheduler.handle-").append(e).append(".timecost\":")
.append(handleOperTimecostMap.get(e));
}
sb.append(generateNodeUsageMetrics("memory"));
sb.append(generateNodeUsageMetrics("vcores"));
sb.append("}");
return sb.toString();
}