private Map getMatricsListIfTaskTypeIsStreaming()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [252:374]


    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
            List<JobInstance> allJobInstance,
            Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
            Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {

        HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();

        // Traverse all jobInstances in allJobInstance
        for (JobInstance jobInstance : allJobInstance) {

            try {
                if (jobInstance.getJobStatus() != null
                        && jobInstance.getJobStatus() == JobStatus.CANCELED) {
                    // If the status of the job is finished or cancelled
                    // the monitoring information is directly obtained from MySQL
                    JobSummaryMetricsRes jobMetricsFromDb =
                            getJobSummaryMetricsResByDb(
                                    jobInstance,
                                    Long.toString(
                                            jobInstanceIdAndJobEngineIdMap.get(
                                                    jobInstance.getId())));
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);

                } else if (jobInstance.getJobStatus() != null
                        && (jobInstance.getJobStatus() == JobStatus.FINISHED
                                || jobInstance.getJobStatus() == JobStatus.FAILED)) {
                    // Obtain monitoring information from the collection of running jobs returned
                    // from
                    // the engine
                    if (!allRunningJobMetricsFromEngine.isEmpty()
                            && allRunningJobMetricsFromEngine.containsKey(
                                    jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                        // If it can be found, update the information in MySQL and return it to the
                        // front-end data
                        modifyAndUpdateJobInstanceAndJobMetrics(
                                jobInstance,
                                allRunningJobMetricsFromEngine,
                                jobInstanceIdAndJobEngineIdMap);

                        // Return data from the front-end
                        JobSummaryMetricsRes jobMetricsFromEngineRes =
                                getRunningJobMetricsFromEngine(
                                        allRunningJobMetricsFromEngine,
                                        jobInstanceIdAndJobEngineIdMap,
                                        jobInstance);
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                    } else {
                        // If not found, obtain information from MySQL
                        JobSummaryMetricsRes jobMetricsFromDb =
                                getJobSummaryMetricsResByDb(
                                        jobInstance,
                                        Long.toString(
                                                jobInstanceIdAndJobEngineIdMap.get(
                                                        jobInstance.getId())));
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
                    }
                } else {
                    // Obtain monitoring information from the collection of running jobs returned
                    // from
                    // the engine
                    if (!allRunningJobMetricsFromEngine.isEmpty()
                            && allRunningJobMetricsFromEngine.containsKey(
                                    jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                        modifyAndUpdateJobInstanceAndJobMetrics(
                                jobInstance,
                                allRunningJobMetricsFromEngine,
                                jobInstanceIdAndJobEngineIdMap);
                        // Return data from the front-end
                        JobSummaryMetricsRes jobMetricsFromEngineRes =
                                getRunningJobMetricsFromEngine(
                                        allRunningJobMetricsFromEngine,
                                        jobInstanceIdAndJobEngineIdMap,
                                        jobInstance);
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                    } else {
                        JobStatus jobStatus = null;
                        try {
                            jobStatus =
                                    getJobStatusByJobEngineId(
                                            String.valueOf(
                                                    jobInstanceIdAndJobEngineIdMap.get(
                                                            jobInstance.getId())));
                        } catch (Exception e) {
                            log.warn(
                                    "getMetricsListIfTaskTypeIsStreaming getJobStatusByJobEngineId is exception jobInstanceId is : {}",
                                    jobInstance.getId());
                        }

                        if (jobStatus != null) {
                            jobInstance.setJobStatus(jobStatus);
                            jobInstanceDao.update(jobInstance);
                            JobSummaryMetricsRes jobSummaryMetricsResByDb =
                                    getJobSummaryMetricsResByDb(
                                            jobInstance,
                                            String.valueOf(
                                                    jobInstanceIdAndJobEngineIdMap.get(
                                                            jobInstance.getId())));
                            jobSummaryMetricsResMap.put(
                                    jobInstance.getId(), jobSummaryMetricsResByDb);
                            List<JobMetrics> jobMetricsFromDb =
                                    getJobMetricsFromDb(
                                            jobInstance,
                                            String.valueOf(
                                                    jobInstanceIdAndJobEngineIdMap.get(
                                                            jobInstance.getId())));
                            if (!jobMetricsFromDb.isEmpty()) {
                                JobStatus finalJobStatusByJobEngineId = jobStatus;
                                jobMetricsFromDb.forEach(
                                        jobMetrics ->
                                                jobMetrics.setStatus(finalJobStatusByJobEngineId));
                                for (JobMetrics jobMetrics : jobMetricsFromDb) {
                                    jobMetricsDao.getJobMetricsMapper().updateById(jobMetrics);
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return jobSummaryMetricsResMap;
    }