private Map getMatricsListIfTaskTypeIsBatch()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [147:214]


    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
            List<JobInstance> allJobInstance,
            Integer userId,
            Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
            Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {

        HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();

        log.info("allRunningJobMetricsFromEngine is {}", allRunningJobMetricsFromEngine.toString());

        // Traverse all jobInstances in allJobInstance
        for (JobInstance jobInstance : allJobInstance) {
            log.info("jobEngineId={}", jobInstance.getJobEngineId());

            if (jobInstance.getJobStatus() == null
                    || jobInstance.getJobStatus().equals("FAILED")
                    || jobInstance.getJobStatus().equals("RUNNING")) {
                // Obtain monitoring information from the collection of running jobs returned from
                // the engine
                if (!allRunningJobMetricsFromEngine.isEmpty()
                        && allRunningJobMetricsFromEngine.containsKey(
                                jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                    JobSummaryMetricsRes jobMetricsFromEngineRes =
                            getRunningJobMetricsFromEngine(
                                    allRunningJobMetricsFromEngine,
                                    jobInstanceIdAndJobEngineIdMap,
                                    jobInstance);
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                    modifyAndUpdateJobInstanceAndJobMetrics(
                            jobInstance,
                            allRunningJobMetricsFromEngine,
                            jobInstanceIdAndJobEngineIdMap,
                            userId);

                } else {
                    log.info(
                            "The job does not exist on the engine, it is directly returned from the database");
                    JobSummaryMetricsRes jobMetriceFromDb =
                            getJobSummaryMetricsResByDb(
                                    jobInstance,
                                    userId,
                                    Long.toString(
                                            jobInstanceIdAndJobEngineIdMap.get(
                                                    jobInstance.getId())));
                    if (jobMetriceFromDb != null) {
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
                    }
                    // 将数据库中的jobInstance和jobMetrics的作业状态改为finished
                    jobInstance.setJobStatus("FINISHED");
                    jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
                }
            } else if (jobInstance.getJobStatus().equals("FINISHED")
                    || jobInstance.getJobStatus().equals("CANCELED")) {
                // If the status of the job is finished or cancelled, the monitoring information is
                // directly obtained from MySQL
                JobSummaryMetricsRes jobMetriceFromDb =
                        getJobSummaryMetricsResByDb(
                                jobInstance,
                                userId,
                                Long.toString(
                                        jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
                log.info("jobStatus=finish oe canceled,JobSummaryMetricsRes={}", jobMetriceFromDb);
                jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
            }
        }

        return jobSummaryMetricsResMap;
    }