in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [252:374]
private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
List<JobInstance> allJobInstance,
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();
// Traverse all jobInstances in allJobInstance
for (JobInstance jobInstance : allJobInstance) {
try {
if (jobInstance.getJobStatus() != null
&& jobInstance.getJobStatus() == JobStatus.CANCELED) {
// If the status of the job is finished or cancelled
// the monitoring information is directly obtained from MySQL
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
} else if (jobInstance.getJobStatus() != null
&& (jobInstance.getJobStatus() == JobStatus.FINISHED
|| jobInstance.getJobStatus() == JobStatus.FAILED)) {
// Obtain monitoring information from the collection of running jobs returned
// from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
// If it can be found, update the information in MySQL and return it to the
// front-end data
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
// Return data from the front-end
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
} else {
// If not found, obtain information from MySQL
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
}
} else {
// Obtain monitoring information from the collection of running jobs returned
// from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
&& allRunningJobMetricsFromEngine.containsKey(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
// Return data from the front-end
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap,
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
} else {
JobStatus jobStatus = null;
try {
jobStatus =
getJobStatusByJobEngineId(
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
} catch (Exception e) {
log.warn(
"getMetricsListIfTaskTypeIsStreaming getJobStatusByJobEngineId is exception jobInstanceId is : {}",
jobInstance.getId());
}
if (jobStatus != null) {
jobInstance.setJobStatus(jobStatus);
jobInstanceDao.update(jobInstance);
JobSummaryMetricsRes jobSummaryMetricsResByDb =
getJobSummaryMetricsResByDb(
jobInstance,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
jobSummaryMetricsResMap.put(
jobInstance.getId(), jobSummaryMetricsResByDb);
List<JobMetrics> jobMetricsFromDb =
getJobMetricsFromDb(
jobInstance,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (!jobMetricsFromDb.isEmpty()) {
JobStatus finalJobStatusByJobEngineId = jobStatus;
jobMetricsFromDb.forEach(
jobMetrics ->
jobMetrics.setStatus(finalJobStatusByJobEngineId));
for (JobMetrics jobMetrics : jobMetricsFromDb) {
jobMetricsDao.getJobMetricsMapper().updateById(jobMetrics);
}
}
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return jobSummaryMetricsResMap;
}