in hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java [201:319]
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
if (metrics.isHasSubTask()) {
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName() + "-sub-" + metrics.getSubTaskId());
boolean isLastTask = metrics.consumeSubTaskResponse(metricsData);
if (isLastTask) {
metricsData = metrics.getSubTaskDataRef().get().build();
} else {
return;
}
} else {
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
}
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
if (log.isDebugEnabled()) {
log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(), job.getApp(), metricsData.getMetrics());
for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
for (CollectRep.Field field : metricsData.getFields()) {
log.debug("Field-->{},Value-->{}", field.getName(), valueRow.getColumns(metricsData.getFields().indexOf(field)));
}
}
}
// If metricsSet is null, it means that the execution is completed or whether the priority of the collection metrics is 0, that is, the availability collection metrics.
// If the availability collection fails, the next metrics scheduling will be cancelled and the next round of scheduling will be entered directly.
boolean isAvailableCollectFailed = metricsSet != null && !metricsSet.isEmpty()
&& metrics.getPriority() == (byte) 0 && metricsData.getCode() != CollectRep.Code.SUCCESS;
if (metricsSet == null || isAvailableCollectFailed || job.isSd()) {
// The collection and execution task of this job are completed.
// The periodic task pushes the task to the time wheel again.
// First, determine the execution time of the task and the task collection interval.
if (!timeout.isCancelled()) {
long spendTime = System.currentTimeMillis() - job.getDispatchTime();
long interval = job.getInterval() - spendTime / 1000;
interval = interval <= 0 ? 0 : interval;
timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
}
} else if (!metricsSet.isEmpty()) {
// The execution of the current level metrics is completed, and the execution of the next level metrics starts
// use pre collect metrics data to replace next metrics config params
List<Map<String, Configmap>> configmapList = CollectUtil.getConfigmapFromPreCollectData(metricsData);
if (configmapList.size() == ENV_CONFIG_SIZE) {
job.addEnvConfigmaps(configmapList.get(0));
}
for (Metrics metricItem : metricsSet) {
Set<String> cryPlaceholderFields = CollectUtil.matchCryPlaceholderField(GSON.toJsonTree(metricItem));
if (cryPlaceholderFields.isEmpty()) {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
continue;
}
boolean isSubTask = configmapList.stream().anyMatch(map -> map.keySet().stream().anyMatch(cryPlaceholderFields::contains));
int subTaskNum = isSubTask ? Math.min(configmapList.size(), MAX_SUB_TASK_NUM) : 1;
AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
AtomicReference<CollectRep.MetricsData.Builder> metricsDataReference = new AtomicReference<>();
for (int index = 0; index < subTaskNum; index++) {
Map<String, Configmap> configmap = new HashMap<>(job.getEnvConfigmaps());
if (isSubTask) {
Map<String, Configmap> preConfigMap = configmapList.get(index);
configmap.putAll(preConfigMap);
}
Metrics metric = CollectUtil.replaceCryPlaceholderToMetrics(metricItem, configmap);
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
}
}
} else {
// The list of metrics at the current execution level has not been fully executed.
// It needs to wait for the execution of other metrics task of the same level to complete the execution and enter the next level for execution.
}
// If it is an asynchronous periodic cyclic task, directly response the collected data
if (job.isSd()) {
CollectRep.MetricsData sdMetricsData = CollectRep.MetricsData.newBuilder(metricsData).build();
commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
}
commonDataQueue.sendMetricsData(metricsData);
} else {
// If it is a temporary one-time task, you need to wait for the collected data of all metrics task to be packaged and returned.
// Insert the current metrics data into the job for unified assembly
job.addCollectMetricsData(metricsData);
if (log.isDebugEnabled()) {
log.debug("One-time Job: {}", metricsData.getMetrics());
for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
for (CollectRep.Field field : metricsData.getFields()) {
log.debug("Field-->{},Value-->{}", field.getName(), valueRow.getColumns(metricsData.getFields().indexOf(field)));
}
}
}
if (job.isSd() || metricsSet == null) {
// The collection and execution of all metrics of this job are completed
// and the result listener is notified of the combination of all metrics data
timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp());
} else if (!metricsSet.isEmpty()) {
// The execution of the current level metrics is completed, and the execution of the next level metrics starts
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
});
} else {
// The list of metrics task at the current execution level has not been fully executed.
// It needs to wait for the execution of other metrics task of the same level to complete the execution and enter the next level for execution.
}
}
}