private Map getMatricsListIfTaskTypeIsBatch()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [150:216]


    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
            List<JobInstance> allJobInstance,
            Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
            Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {

        HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();

        log.info("allRunningJobMetricsFromEngine is {}", allRunningJobMetricsFromEngine.toString());

        // Traverse all jobInstances in allJobInstance
        for (JobInstance jobInstance : allJobInstance) {
            log.info("jobEngineId={}", jobInstance.getJobEngineId());

            if (jobInstance.getJobStatus() == null
                    || jobInstance.getJobStatus() == JobStatus.FAILED
                    || jobInstance.getJobStatus() == JobStatus.RUNNING) {
                // Obtain monitoring information from the collection of running jobs returned from
                // the engine
                if (!allRunningJobMetricsFromEngine.isEmpty()
                        && allRunningJobMetricsFromEngine.containsKey(
                                jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                    JobSummaryMetricsRes jobMetricsFromEngineRes =
                            getRunningJobMetricsFromEngine(
                                    allRunningJobMetricsFromEngine,
                                    jobInstanceIdAndJobEngineIdMap,
                                    jobInstance);
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                    modifyAndUpdateJobInstanceAndJobMetrics(
                            jobInstance,
                            allRunningJobMetricsFromEngine,
                            jobInstanceIdAndJobEngineIdMap);

                } else {
                    log.info(
                            "The job does not exist on the engine, it is directly returned from the database");
                    JobSummaryMetricsRes jobMetricsFromDb =
                            getJobSummaryMetricsResByDb(
                                    jobInstance,
                                    Long.toString(
                                            jobInstanceIdAndJobEngineIdMap.get(
                                                    jobInstance.getId())));
                    if (jobMetricsFromDb != null) {
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
                    }
                    if (jobInstance.getJobStatus() == JobStatus.RUNNING) {
                        // Set the job status of jobInstance and jobMetrics in the database to
                        // finished
                        jobInstance.setJobStatus(JobStatus.FINISHED);
                        jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
                    }
                }
            } else if (jobInstance.getJobStatus() == JobStatus.FINISHED
                    || jobInstance.getJobStatus() == JobStatus.CANCELED) {
                // If the status of the job is finished or cancelled, the monitoring information is
                // directly obtained from MySQL
                JobSummaryMetricsRes jobMetricsFromDb =
                        getJobSummaryMetricsResByDb(
                                jobInstance,
                                Long.toString(
                                        jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
                log.info("jobStatus=finish oe canceled,JobSummaryMetricsRes={}", jobMetricsFromDb);
                jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
            }
        }

        return jobSummaryMetricsResMap;
    }