in hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java [82:156]
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try (final CollectRep.MetricsData metricsData = dataQueue.pollServiceDiscoveryData()) {
Long monitorId = metricsData.getId();
final Monitor mainMonitor = monitorDao.findById(monitorId).orElse(null);
if (mainMonitor == null) {
log.warn("No monitor found for id {}", monitorId);
continue;
}
// collector
final Optional<CollectorMonitorBind> collectorBind = collectorMonitorBindDao.findCollectorMonitorBindByMonitorId(monitorId);
String collector = collectorBind.map(CollectorMonitorBind::getCollector).orElse(null);
// params
List<Param> mainMonitorParams = paramDao.findParamsByMonitorId(monitorId);
final Map<String, MonitorBind> subMonitorBindMap = monitorBindDao.findMonitorBindsByBizId(monitorId)
.stream().collect(Collectors.toMap(MonitorBind::getKeyStr, item -> item));
RowWrapper rowWrapper = metricsData.readRow();
Map<String, String> fieldsValue = Maps.newHashMapWithExpectedSize(8);
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
fieldsValue.clear();
rowWrapper.cellStream().forEach(cell -> {
String value = cell.getValue();
fieldsValue.put(cell.getField().getName(), value);
});
final String host = fieldsValue.get(FILED_HOST);
final String port = fieldsValue.get(FILED_PORT);
final String keyStr = host + ":" + port;
if (subMonitorBindMap.containsKey(keyStr)) {
subMonitorBindMap.remove(keyStr);
continue;
}
Monitor newMonitor = mainMonitor.clone();
newMonitor.setId(null);
newMonitor.setHost(host);
newMonitor.setName(newMonitor.getName() + "-" + host + ":" + port);
newMonitor.setScrape(CommonConstants.SCRAPE_STATIC);
newMonitor.setGmtCreate(LocalDateTime.now());
newMonitor.setGmtUpdate(LocalDateTime.now());
// replace host port
List<Param> newParams = new LinkedList<>();
for (Param param : mainMonitorParams) {
Param newParam = param.clone();
newParam.setId(null);
newParam.setGmtUpdate(null);
newParam.setGmtCreate(null);
if (FILED_HOST.equals(newParam.getField())) {
newParam.setParamValue(host);
} else if (FILED_PORT.equals(newParam.getField())) {
newParam.setParamValue(port);
}
newParams.add(newParam);
}
monitorService.addMonitor(newMonitor, newParams, collector, null);
MonitorBind monitorBind = MonitorBind.builder()
.bizId(monitorId)
.monitorId(newMonitor.getId())
.keyStr(keyStr)
.build();
monitorBindDao.save(monitorBind);
}
// hostMonitorMap only contains monitors which are already existed but not in service discovery data
// due to monitors that coincide with service discovery data are removed.
// Thus, all monitors still in hostMonitorMap need to be cancelled.
final Set<Long> needCancelMonitorIdSet = subMonitorBindMap.values().stream()
.map(MonitorBind::getMonitorId).collect(Collectors.toSet());
monitorService.cancelManageMonitors(needCancelMonitorIdSet);
for (Long id : needCancelMonitorIdSet) {
monitorBindDao.deleteMonitorBindByBizIdAndMonitorId(monitorId, id);
}
} catch (Exception exception) {
log.error(exception.getMessage(), exception);
}
}
}