in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [150:216]
private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
List<JobInstance> allJobInstance,
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();
log.info("allRunningJobMetricsFromEngine is {}", allRunningJobMetricsFromEngine.toString());
// Traverse all jobInstances in allJobInstance
for (JobInstance jobInstance : allJobInstance) {
log.info("jobEngineId={}", jobInstance.getJobEngineId());
if (jobInstance.getJobStatus() == null
|| jobInstance.getJobStatus() == JobStatus.FAILED
|| jobInstance.getJobStatus() == JobStatus.RUNNING) {
// Obtain monitoring information from the collection of running jobs returned from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
} else {
log.info(
"The job does not exist on the engine, it is directly returned from the database");
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (jobMetricsFromDb != null) {
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
}
if (jobInstance.getJobStatus() == JobStatus.RUNNING) {
// Set the job status of jobInstance and jobMetrics in the database to
// finished
jobInstance.setJobStatus(JobStatus.FINISHED);
jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
}
}
} else if (jobInstance.getJobStatus() == JobStatus.FINISHED
|| jobInstance.getJobStatus() == JobStatus.CANCELED) {
// If the status of the job is finished or cancelled, the monitoring information is
// directly obtained from MySQL
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
log.info("jobStatus=finish oe canceled,JobSummaryMetricsRes={}", jobMetricsFromDb);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
}
}
return jobSummaryMetricsResMap;
}