private Map getMatricsListIfTaskTypeIsStreaming()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java [253:366]


    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
            List<JobInstance> allJobInstance,
            Integer userId,
            Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
            Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {

        HashMap<Long, JobSummaryMetricsRes> jobSummaryMetricsResMap = new HashMap<>();

        // Traverse all jobInstances in allJobInstance
        for (JobInstance jobInstance : allJobInstance) {

            if (jobInstance.getJobStatus() != null
                    && jobInstance.getJobStatus().equals("CANCELED")) {
                // If the status of the job is finished or cancelled
                // the monitoring information is directly obtained from MySQL
                JobSummaryMetricsRes jobMetriceFromDb =
                        getJobSummaryMetricsResByDb(
                                jobInstance,
                                userId,
                                Long.toString(
                                        jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
                jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);

            } else if (jobInstance.getJobStatus() != null
                    && (jobInstance.getJobStatus().equals("FINISHED")
                            || jobInstance.getJobStatus().equals("FAILED"))) {
                // Obtain monitoring information from the collection of running jobs returned from
                // the engine
                if (!allRunningJobMetricsFromEngine.isEmpty()
                        && allRunningJobMetricsFromEngine.containsKey(
                                jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                    // If it can be found, update the information in MySQL and return it to the
                    // front-end data
                    modifyAndUpdateJobInstanceAndJobMetrics(
                            jobInstance,
                            allRunningJobMetricsFromEngine,
                            jobInstanceIdAndJobEngineIdMap,
                            userId);

                    /** Return data from the front-end */
                    JobSummaryMetricsRes jobMetricsFromEngineRes =
                            getRunningJobMetricsFromEngine(
                                    allRunningJobMetricsFromEngine,
                                    jobInstanceIdAndJobEngineIdMap,
                                    jobInstance);
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                } else {
                    // If not found, obtain information from MySQL
                    JobSummaryMetricsRes jobMetriceFromDb =
                            getJobSummaryMetricsResByDb(
                                    jobInstance,
                                    userId,
                                    Long.toString(
                                            jobInstanceIdAndJobEngineIdMap.get(
                                                    jobInstance.getId())));
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetriceFromDb);
                }
            } else {
                // Obtain monitoring information from the collection of running jobs returned from
                // the engine
                if (!allRunningJobMetricsFromEngine.isEmpty()
                        && allRunningJobMetricsFromEngine.containsKey(
                                jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()))) {
                    modifyAndUpdateJobInstanceAndJobMetrics(
                            jobInstance,
                            allRunningJobMetricsFromEngine,
                            jobInstanceIdAndJobEngineIdMap,
                            userId);
                    /** Return data from the front-end */
                    JobSummaryMetricsRes jobMetricsFromEngineRes =
                            getRunningJobMetricsFromEngine(
                                    allRunningJobMetricsFromEngine,
                                    jobInstanceIdAndJobEngineIdMap,
                                    jobInstance);
                    jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
                } else {
                    String jobStatusByJobEngineId =
                            getJobStatusByJobEngineId(
                                    String.valueOf(
                                            jobInstanceIdAndJobEngineIdMap.get(
                                                    jobInstance.getId())));
                    if (jobStatusByJobEngineId != null) {
                        jobInstance.setJobStatus(jobStatusByJobEngineId);
                        jobInstanceDao.update(jobInstance);
                        JobSummaryMetricsRes jobSummaryMetricsResByDb =
                                getJobSummaryMetricsResByDb(
                                        jobInstance,
                                        userId,
                                        String.valueOf(
                                                jobInstanceIdAndJobEngineIdMap.get(
                                                        jobInstance.getId())));
                        jobSummaryMetricsResMap.put(jobInstance.getId(), jobSummaryMetricsResByDb);
                        List<JobMetrics> jobMetricsFromDb =
                                getJobMetricsFromDb(
                                        jobInstance,
                                        userId,
                                        String.valueOf(
                                                jobInstanceIdAndJobEngineIdMap.get(
                                                        jobInstance.getId())));
                        if (!jobMetricsFromDb.isEmpty()) {
                            jobMetricsFromDb.stream()
                                    .forEach(
                                            jobMetrics ->
                                                    jobMetrics.setStatus(jobStatusByJobEngineId));
                            for (JobMetrics jobMetrics : jobMetricsFromDb) {
                                jobMetricsDao.getJobMetricsMapper().updateById(jobMetrics);
                            }
                        }
                    }
                }
            }
        }
        return jobSummaryMetricsResMap;
    }