in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [147:214]
private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
List<JobInstance> allJobInstance,
Integer userId,
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();
log.info("allRunningJobMetricsFromEngine is {}", allRunningJobMetricsFromEngine.toString());
// Traverse all jobInstances in allJobInstance
for (JobInstance jobInstance : allJobInstance) {
log.info("jobEngineId={}", jobInstance.getJobEngineId());
if (jobInstance.getJobStatus() == null
|| jobInstance.getJobStatus().equals("FAILED")
|| jobInstance.getJobStatus().equals("RUNNING")) {
// Obtain monitoring information from the collection of running jobs returned from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
userId);
} else {
log.info(
"The job does not exist on the engine, it is directly returned from the database");
JobSummaryMetricsRes jobMetriceFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (jobMetriceFromDb != null) {
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
}
// 将数据库中的jobInstance和jobMetrics的作业状态改为finished
jobInstance.setJobStatus("FINISHED");
jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
}
} else if (jobInstance.getJobStatus().equals("FINISHED")
|| jobInstance.getJobStatus().equals("CANCELED")) {
// If the status of the job is finished or cancelled, the monitoring information is
// directly obtained from MySQL
JobSummaryMetricsRes jobMetriceFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
log.info("jobStatus=finish oe canceled,JobSummaryMetricsRes={}", jobMetriceFromDb);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
}
}
return jobSummaryMetricsResMap;
}