public String generateRealTimeTrackingMetrics()

in hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java [326:558]


  public String generateRealTimeTrackingMetrics() {
    // JVM
    double jvmFreeMemoryGB, jvmMaxMemoryGB, jvmTotalMemoryGB;
    if (jvmFreeMemoryGauge == null &&
            metrics.getGauges().containsKey("variable.jvm.free.memory")) {
      jvmFreeMemoryGauge = metrics.getGauges().get("variable.jvm.free.memory");
    }
    if (jvmMaxMemoryGauge == null &&
            metrics.getGauges().containsKey("variable.jvm.max.memory")) {
      jvmMaxMemoryGauge = metrics.getGauges().get("variable.jvm.max.memory");
    }
    if (jvmTotalMemoryGauge == null &&
            metrics.getGauges().containsKey("variable.jvm.total.memory")) {
      jvmTotalMemoryGauge = metrics.getGauges()
              .get("variable.jvm.total.memory");
    }
    jvmFreeMemoryGB = jvmFreeMemoryGauge == null ? 0 :
            Double.parseDouble(jvmFreeMemoryGauge.getValue().toString())
                    /1024/1024/1024;
    jvmMaxMemoryGB = jvmMaxMemoryGauge == null ? 0 :
            Double.parseDouble(jvmMaxMemoryGauge.getValue().toString())
                    /1024/1024/1024;
    jvmTotalMemoryGB = jvmTotalMemoryGauge == null ? 0 :
            Double.parseDouble(jvmTotalMemoryGauge.getValue().toString())
                    /1024/1024/1024;

    // number of running applications/containers
    String numRunningApps, numRunningContainers;
    if (numRunningAppsGauge == null &&
            metrics.getGauges().containsKey("variable.running.application")) {
      numRunningAppsGauge =
              metrics.getGauges().get("variable.running.application");
    }
    if (numRunningContainersGauge == null &&
            metrics.getGauges().containsKey("variable.running.container")) {
      numRunningContainersGauge =
              metrics.getGauges().get("variable.running.container");
    }
    numRunningApps = numRunningAppsGauge == null ? "0" :
            numRunningAppsGauge.getValue().toString();
    numRunningContainers = numRunningContainersGauge == null ? "0" :
            numRunningContainersGauge.getValue().toString();

    // cluster available/allocate resource
    double allocatedMemoryGB, allocatedVCoresGB,
            availableMemoryGB, availableVCoresGB;
    if (allocatedMemoryGauge == null &&
            metrics.getGauges()
                    .containsKey("variable.cluster.allocated.memory")) {
      allocatedMemoryGauge = metrics.getGauges()
              .get("variable.cluster.allocated.memory");
    }
    if (allocatedVCoresGauge == null &&
            metrics.getGauges()
                    .containsKey("variable.cluster.allocated.vcores")) {
      allocatedVCoresGauge = metrics.getGauges()
              .get("variable.cluster.allocated.vcores");
    }
    if (availableMemoryGauge == null &&
            metrics.getGauges()
                    .containsKey("variable.cluster.available.memory")) {
      availableMemoryGauge = metrics.getGauges()
              .get("variable.cluster.available.memory");
    }
    if (availableVCoresGauge == null &&
            metrics.getGauges()
                    .containsKey("variable.cluster.available.vcores")) {
      availableVCoresGauge = metrics.getGauges()
              .get("variable.cluster.available.vcores");
    }
    allocatedMemoryGB = allocatedMemoryGauge == null ? 0 :
            Double.parseDouble(allocatedMemoryGauge.getValue().toString())/1024;
    allocatedVCoresGB = allocatedVCoresGauge == null ? 0 :
            Double.parseDouble(allocatedVCoresGauge.getValue().toString());
    availableMemoryGB = availableMemoryGauge == null ? 0 :
            Double.parseDouble(availableMemoryGauge.getValue().toString())/1024;
    availableVCoresGB = availableVCoresGauge == null ? 0 :
            Double.parseDouble(availableVCoresGauge.getValue().toString());

    // scheduler operation
    double allocateTimecost, commitSuccessTimecost, commitFailureTimecost,
        handleTimecost;
    if (allocateTimecostHistogram == null &&
            metrics.getHistograms().containsKey(
                    "sampler.scheduler.operation.allocate.timecost")) {
      allocateTimecostHistogram = metrics.getHistograms()
              .get("sampler.scheduler.operation.allocate.timecost");
    }
    if (commitSuccessTimecostHistogram == null &&
        metrics.getHistograms().containsKey(
            "sampler.scheduler.operation.commit.success.timecost")) {
      commitSuccessTimecostHistogram = metrics.getHistograms()
          .get("sampler.scheduler.operation.commit.success.timecost");
    }
    if (commitFailureTimecostHistogram == null &&
        metrics.getHistograms().containsKey(
            "sampler.scheduler.operation.commit.failure.timecost")) {
      commitFailureTimecostHistogram = metrics.getHistograms()
          .get("sampler.scheduler.operation.commit.failure.timecost");
    }
    if (handleTimecostHistogram == null &&
            metrics.getHistograms().containsKey(
                    "sampler.scheduler.operation.handle.timecost")) {
      handleTimecostHistogram = metrics.getHistograms()
              .get("sampler.scheduler.operation.handle.timecost");
    }
    allocateTimecost = allocateTimecostHistogram == null ? 0.0 :
            allocateTimecostHistogram.getSnapshot().getMean()/1000000;
    commitSuccessTimecost = commitSuccessTimecostHistogram == null ? 0.0 :
            commitSuccessTimecostHistogram.getSnapshot().getMean()/1000000;
    commitFailureTimecost = commitFailureTimecostHistogram == null ? 0.0 :
            commitFailureTimecostHistogram.getSnapshot().getMean()/1000000;
    handleTimecost = handleTimecostHistogram == null ? 0.0 :
            handleTimecostHistogram.getSnapshot().getMean()/1000000;
    // various handle operation
    Map<SchedulerEventType, Double> handleOperTimecostMap =
            new HashMap<SchedulerEventType, Double>();
    for (SchedulerEventType e : SchedulerEventType.values()) {
      String key = "sampler.scheduler.operation.handle." + e + ".timecost";
      if (! handleOperTimecostHistogramMap.containsKey(e) &&
              metrics.getHistograms().containsKey(key)) {
        handleOperTimecostHistogramMap.put(e, metrics.getHistograms().get(key));
      }
      double timecost = handleOperTimecostHistogramMap.containsKey(e) ?
          handleOperTimecostHistogramMap.get(e).getSnapshot().getMean()/1000000
              : 0;
      handleOperTimecostMap.put(e, timecost);
    }

    // allocated resource for each queue
    Map<String, Double> queueAllocatedMemoryMap = new HashMap<String, Double>();
    Map<String, Long> queueAllocatedVCoresMap = new HashMap<String, Long>();
    for (String queue : wrapper.getTracker().getQueueSet()) {
      // memory
      String key = "counter.queue." + queue + ".allocated.memory";
      if (! queueAllocatedMemoryCounterMap.containsKey(queue) &&
              metrics.getCounters().containsKey(key)) {
        queueAllocatedMemoryCounterMap.put(queue,
                metrics.getCounters().get(key));
      }
      double queueAllocatedMemoryGB =
              queueAllocatedMemoryCounterMap.containsKey(queue) ?
                  queueAllocatedMemoryCounterMap.get(queue).getCount()/1024.0
                      : 0;
      queueAllocatedMemoryMap.put(queue, queueAllocatedMemoryGB);
      // vCores
      key = "counter.queue." + queue + ".allocated.cores";
      if (! queueAllocatedVCoresCounterMap.containsKey(queue) &&
              metrics.getCounters().containsKey(key)) {
        queueAllocatedVCoresCounterMap.put(
                queue, metrics.getCounters().get(key));
      }
      long queueAllocatedVCores =
              queueAllocatedVCoresCounterMap.containsKey(queue) ?
                      queueAllocatedVCoresCounterMap.get(queue).getCount(): 0;
      queueAllocatedVCoresMap.put(queue, queueAllocatedVCores);
    }

    // calculate commit throughput, unit is number/second
    if (schedulerCommitSuccessCounter == null && metrics.getCounters()
        .containsKey("counter.scheduler.operation.commit.success")) {
      schedulerCommitSuccessCounter = metrics.getCounters()
          .get("counter.scheduler.operation.commit.success");
    }
    if (schedulerCommitFailureCounter == null && metrics.getCounters()
        .containsKey("counter.scheduler.operation.commit.failure")) {
      schedulerCommitFailureCounter = metrics.getCounters()
          .get("counter.scheduler.operation.commit.failure");
    }
    long schedulerCommitSuccessThroughput = 0;
    long schedulerCommitFailureThroughput = 0;
    if (schedulerCommitSuccessCounter != null
        && schedulerCommitFailureCounter != null) {
      long currentTrackingTime = System.currentTimeMillis();
      long currentSchedulerCommitSucessCount =
          schedulerCommitSuccessCounter.getCount();
      long currentSchedulerCommitFailureCount =
          schedulerCommitFailureCounter.getCount();
      if (lastTrackingTime != null) {
        double intervalSeconds =
            (double) (currentTrackingTime - lastTrackingTime) / 1000;
        schedulerCommitSuccessThroughput = Math.round(
            (currentSchedulerCommitSucessCount
                - lastSchedulerCommitSuccessCount) / intervalSeconds);
        schedulerCommitFailureThroughput = Math.round(
            (currentSchedulerCommitFailureCount
                - lastSchedulerCommitFailureCount) / intervalSeconds);
      }
      lastTrackingTime = currentTrackingTime;
      lastSchedulerCommitSuccessCount = currentSchedulerCommitSucessCount;
      lastSchedulerCommitFailureCount = currentSchedulerCommitFailureCount;
    }

    // package results
    StringBuilder sb = new StringBuilder();
    sb.append("{");
    sb.append("\"time\":" ).append(System.currentTimeMillis())
            .append(",\"jvm.free.memory\":").append(jvmFreeMemoryGB)
            .append(",\"jvm.max.memory\":").append(jvmMaxMemoryGB)
            .append(",\"jvm.total.memory\":").append(jvmTotalMemoryGB)
            .append(",\"running.applications\":").append(numRunningApps)
            .append(",\"running.containers\":").append(numRunningContainers)
            .append(",\"cluster.allocated.memory\":").append(allocatedMemoryGB)
            .append(",\"cluster.allocated.vcores\":").append(allocatedVCoresGB)
            .append(",\"cluster.available.memory\":").append(availableMemoryGB)
            .append(",\"cluster.available.vcores\":").append(availableVCoresGB);

    for (String queue : wrapper.getTracker().getQueueSet()) {
      sb.append(",\"queue.").append(queue).append(".allocated.memory\":")
              .append(queueAllocatedMemoryMap.get(queue));
      sb.append(",\"queue.").append(queue).append(".allocated.vcores\":")
              .append(queueAllocatedVCoresMap.get(queue));
    }
    // scheduler allocate & handle
    sb.append(",\"scheduler.allocate.timecost\":").append(allocateTimecost);
    sb.append(",\"scheduler.commit.success.timecost\":")
        .append(commitSuccessTimecost);
    sb.append(",\"scheduler.commit.failure.timecost\":")
        .append(commitFailureTimecost);
    sb.append(",\"scheduler.commit.success.throughput\":")
        .append(schedulerCommitSuccessThroughput);
    sb.append(",\"scheduler.commit.failure.throughput\":")
        .append(schedulerCommitFailureThroughput);
    sb.append(",\"scheduler.handle.timecost\":").append(handleTimecost);
    for (SchedulerEventType e : SchedulerEventType.values()) {
      sb.append(",\"scheduler.handle-").append(e).append(".timecost\":")
              .append(handleOperTimecostMap.get(e));
    }
    sb.append(generateNodeUsageMetrics("memory"));
    sb.append(generateNodeUsageMetrics("vcores"));
    sb.append("}");
    return sb.toString();
  }