public void dispatchCollectData()

in hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java [201:319]


    public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
        WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
        Job job = timerJob.getJob();
        if (metrics.isHasSubTask()) {
            metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName() + "-sub-" + metrics.getSubTaskId());
            boolean isLastTask = metrics.consumeSubTaskResponse(metricsData);
            if (isLastTask) {
                metricsData = metrics.getSubTaskDataRef().get().build();
            } else {
                return;
            }
        } else {
            metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
        }
        Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
        if (job.isCyclic()) {
            if (log.isDebugEnabled()) {
                log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(), job.getApp(), metricsData.getMetrics());
                for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
                    for (CollectRep.Field field : metricsData.getFields()) {
                        log.debug("Field-->{},Value-->{}", field.getName(), valueRow.getColumns(metricsData.getFields().indexOf(field)));
                    }
                }
            }
            // If metricsSet is null, it means that the execution is completed or whether the priority of the collection metrics is 0, that is, the availability collection metrics.
            // If the availability collection fails, the next metrics scheduling will be cancelled and the next round of scheduling will be entered directly.
            boolean isAvailableCollectFailed = metricsSet != null && !metricsSet.isEmpty()
                    && metrics.getPriority() == (byte) 0 && metricsData.getCode() != CollectRep.Code.SUCCESS;
            if (metricsSet == null || isAvailableCollectFailed || job.isSd()) {
                // The collection and execution task of this job are completed.
                // The periodic task pushes the task to the time wheel again.
                // First, determine the execution time of the task and the task collection interval.
                if (!timeout.isCancelled()) {
                    long spendTime = System.currentTimeMillis() - job.getDispatchTime();
                    long interval = job.getInterval() - spendTime / 1000;
                    interval = interval <= 0 ? 0 : interval;
                    timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
                }
            } else if (!metricsSet.isEmpty()) {
                // The execution of the current level metrics is completed, and the execution of the next level metrics starts
                // use pre collect metrics data to replace next metrics config params
                List<Map<String, Configmap>> configmapList = CollectUtil.getConfigmapFromPreCollectData(metricsData);
                if (configmapList.size() == ENV_CONFIG_SIZE) {
                    job.addEnvConfigmaps(configmapList.get(0));
                }
                for (Metrics metricItem : metricsSet) {
                    Set<String> cryPlaceholderFields = CollectUtil.matchCryPlaceholderField(GSON.toJsonTree(metricItem));
                    if (cryPlaceholderFields.isEmpty()) {
                        MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
                                collectorIdentity, unitConvertList);
                        jobRequestQueue.addJob(metricsCollect);
                        metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
                                new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
                        continue;
                    }
                    boolean isSubTask = configmapList.stream().anyMatch(map -> map.keySet().stream().anyMatch(cryPlaceholderFields::contains));
                    int subTaskNum = isSubTask ? Math.min(configmapList.size(), MAX_SUB_TASK_NUM) : 1;
                    AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
                    AtomicReference<CollectRep.MetricsData.Builder> metricsDataReference = new AtomicReference<>();
                    for (int index = 0; index < subTaskNum; index++) {
                        Map<String, Configmap> configmap = new HashMap<>(job.getEnvConfigmaps());
                        if (isSubTask) {
                            Map<String, Configmap> preConfigMap = configmapList.get(index);
                            configmap.putAll(preConfigMap);
                        }
                        Metrics metric = CollectUtil.replaceCryPlaceholderToMetrics(metricItem, configmap);
                        metric.setSubTaskNum(subTaskNumAtomic);
                        metric.setSubTaskId(index);
                        metric.setSubTaskDataRef(metricsDataReference);
                        MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this,
                                collectorIdentity, unitConvertList);
                        jobRequestQueue.addJob(metricsCollect);
                        metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
                                new MetricsTime(System.currentTimeMillis(), metric, timeout));
                    }

                }
            } else {
                // The list of metrics at the current execution level has not been fully executed.
                // It needs to wait for the execution of other metrics task of the same level to complete the execution and enter the next level for execution.
            }
            // If it is an asynchronous periodic cyclic task, directly response the collected data
            if (job.isSd()) {
                CollectRep.MetricsData sdMetricsData = CollectRep.MetricsData.newBuilder(metricsData).build();
                commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
            }
            commonDataQueue.sendMetricsData(metricsData);
        } else {
            // If it is a temporary one-time task, you need to wait for the collected data of all metrics task to be packaged and returned.
            // Insert the current metrics data into the job for unified assembly
            job.addCollectMetricsData(metricsData);
            if (log.isDebugEnabled()) {
                log.debug("One-time Job: {}", metricsData.getMetrics());
                for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
                    for (CollectRep.Field field : metricsData.getFields()) {
                        log.debug("Field-->{},Value-->{}", field.getName(), valueRow.getColumns(metricsData.getFields().indexOf(field)));
                    }
                }
            }

            if (job.isSd() || metricsSet == null) {
                // The collection and execution of all metrics of this job are completed
                // and the result listener is notified of the combination of all metrics data
                timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp());
            } else if (!metricsSet.isEmpty()) {
                // The execution of the current level metrics is completed, and the execution of the next level metrics starts
                metricsSet.forEach(metricItem -> {
                    MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
                            collectorIdentity, unitConvertList);
                    jobRequestQueue.addJob(metricsCollect);
                    metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
                            new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
                });
            } else {
                // The list of metrics task at the current execution level has not been fully executed.
                // It needs to wait for the execution of other metrics task of the same level to complete the execution and enter the next level for execution.
            }
        }
    }