in hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java [96:176]
public void collectorGoOnline(String identity, CollectorInfo collectorInfo) {
if (StringUtils.isBlank(identity)) {
log.error("identity can not be null if collector not existed");
return;
}
Collector collector = collectorDao.findCollectorByName(identity).orElse(null);
if (Objects.nonNull(collector)) {
if (collector.getStatus() == CommonConstants.COLLECTOR_STATUS_ONLINE) {
return;
}
collector.setStatus(CommonConstants.COLLECTOR_STATUS_ONLINE);
if (collectorInfo != null) {
collector.setIp(collectorInfo.getIp());
collector.setMode(collectorInfo.getMode());
collector.setVersion(collectorInfo.getVersion());
}
} else {
if (collectorInfo == null) {
log.error("collectorInfo can not null when collector not existed");
return;
}
collector = Collector.builder()
.name(identity)
.ip(collectorInfo.getIp())
.mode(collectorInfo.getMode())
.version(collectorInfo.getVersion())
.status(CommonConstants.COLLECTOR_STATUS_ONLINE)
.build();
}
collectorDao.save(collector);
ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(),
collector.getIp(), System.currentTimeMillis(), null);
consistentHash.addNode(node);
reBalanceCollectorAssignJobs();
// Read database The fixed collection tasks at this collector are delivered
List<CollectorMonitorBind> binds = collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity);
if (CollectionUtils.isEmpty(binds)){
return;
}
List<Monitor> monitors = monitorDao.findMonitorsByIdIn(binds.stream().map(CollectorMonitorBind::getMonitorId).collect(Collectors.toSet()));
for (Monitor monitor : monitors) {
if (Objects.isNull(monitor) || monitor.getStatus() == CommonConstants.MONITOR_PAUSED_CODE) {
continue;
}
try {
// build collect job entity
Job appDefine = appService.getAppDefine(monitor.getApp());
if (CommonConstants.PROMETHEUS.equals(monitor.getApp())) {
appDefine.setApp(CommonConstants.PROMETHEUS_APP_PREFIX + monitor.getName());
}
appDefine.setMonitorId(monitor.getId());
appDefine.setDefaultInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
List<Param> params = paramDao.findParamsByMonitorId(monitor.getId());
List<Configmap> configmaps = params.stream()
.map(param -> Configmap.builder()
.key(param.getField())
.value(param.getParamValue())
.type(param.getType()).build()).collect(Collectors.toList());
List<ParamDefine> paramDefaultValue = appDefine.getParams().stream()
.filter(item -> StringUtils.isNotBlank(item.getDefaultValue()))
.toList();
paramDefaultValue.forEach(defaultVar -> {
if (configmaps.stream().noneMatch(item -> item.getKey().equals(defaultVar.getField()))) {
configmaps.add(Configmap.builder()
.key(defaultVar.getField())
.value(defaultVar.getDefaultValue())
.type((byte) 1)
.build());
}
});
appDefine.setConfigmap(configmaps);
long jobId = addAsyncCollectJob(appDefine, identity);
monitor.setJobId(jobId);
monitorDao.save(monitor);
} catch (Exception e) {
log.error("insert pinned monitor job: {} in collector: {} error,continue next monitor", monitor, identity, e);
}
}
}