in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java [232:306]
public Map<String, Serializable> dashboard(Long teamId) {
JobsOverview.Task overview = new JobsOverview.Task();
Integer totalJmMemory = 0;
Integer totalTmMemory = 0;
Integer totalTm = 0;
Integer totalSlot = 0;
Integer availableSlot = 0;
Integer runningJob = 0;
// stat metrics from other than kubernetes mode
for (Application app : FlinkHttpWatcher.getWatchingApps()) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
if (app.getJmMemory() != null) {
totalJmMemory += app.getJmMemory();
}
if (app.getTmMemory() != null) {
totalTmMemory += app.getTmMemory() * (app.getTotalTM() == null ? 1 : app.getTotalTM());
}
if (app.getTotalTM() != null) {
totalTm += app.getTotalTM();
}
if (app.getTotalSlot() != null) {
totalSlot += app.getTotalSlot();
}
if (app.getAvailableSlot() != null) {
availableSlot += app.getAvailableSlot();
}
if (app.getState() == FlinkAppState.RUNNING.getValue()) {
runningJob++;
}
JobsOverview.Task task = app.getOverview();
if (task != null) {
overview.setTotal(overview.getTotal() + task.getTotal());
overview.setCreated(overview.getCreated() + task.getCreated());
overview.setScheduled(overview.getScheduled() + task.getScheduled());
overview.setDeploying(overview.getDeploying() + task.getDeploying());
overview.setRunning(overview.getRunning() + task.getRunning());
overview.setFinished(overview.getFinished() + task.getFinished());
overview.setCanceling(overview.getCanceling() + task.getCanceling());
overview.setCanceled(overview.getCanceled() + task.getCanceled());
overview.setFailed(overview.getFailed() + task.getFailed());
overview.setReconciling(overview.getReconciling() + task.getReconciling());
}
}
// merge metrics from flink kubernetes cluster
FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
totalTm += k8sMetric.totalTm();
totalSlot += k8sMetric.totalSlot();
availableSlot += k8sMetric.availableSlot();
runningJob += k8sMetric.runningJob();
overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
}
// result json
Map<String, Serializable> map = new HashMap<>(8);
map.put("task", overview);
map.put("jmMemory", totalJmMemory);
map.put("tmMemory", totalTmMemory);
map.put("totalTM", totalTm);
map.put("availableSlot", availableSlot);
map.put("totalSlot", totalSlot);
map.put("runningJob", runningJob);
return map;
}