in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [253:366]
private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
List<JobInstance> allJobInstance,
Integer userId,
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();
// Traverse all jobInstances in allJobInstance
for (JobInstance jobInstance : allJobInstance) {
if (jobInstance.getJobStatus() != null
&& jobInstance.getJobStatus().equals("CANCELED")) {
// If the status of the job is finished or cancelled
// the monitoring information is directly obtained from MySQL
JobSummaryMetricsRes jobMetriceFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
} else if (jobInstance.getJobStatus() != null
&& (jobInstance.getJobStatus().equals("FINISHED")
|| jobInstance.getJobStatus().equals("FAILED"))) {
// Obtain monitoring information from the collection of running jobs returned from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
// If it can be found, update the information in MySQL and return it to the
// front-end data
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
userId);
/** Return data from the front-end */
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
} else {
// If not found, obtain information from MySQL
JobSummaryMetricsRes jobMetriceFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
}
} else {
// Obtain monitoring information from the collection of running jobs returned from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
userId);
/** Return data from the front-end */
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
} else {
String jobStatusByJobEngineId =
getJobStatusByJobEngineId(
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (jobStatusByJobEngineId != null) {
jobInstance.setJobStatus(jobStatusByJobEngineId);
jobInstanceDao.update(jobInstance);
JobSummaryMetricsRes jobSummaryMetricsResByDb =
getJobSummaryMetricsResByDb(
jobInstance,
userId,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
jobSummaryMetricsResMap.put(jobInstance.getId(), jobSummaryMetricsResByDb);
List<JobMetrics> jobMetricsFromDb =
getJobMetricsFromDb(
jobInstance,
userId,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (!jobMetricsFromDb.isEmpty()) {
jobMetricsFromDb.stream()
.forEach(
jobMetrics ->
jobMetrics.setStatus(jobStatusByJobEngineId));
for (JobMetrics jobMetrics : jobMetricsFromDb) {
jobMetricsDao.getJobMetricsMapper().updateById(jobMetrics);
}
}
}
}
}
}
return jobSummaryMetricsResMap;
}