public void run()

in hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java [82:156]


        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try (final CollectRep.MetricsData metricsData = dataQueue.pollServiceDiscoveryData()) {
                    Long monitorId = metricsData.getId();
                    final Monitor mainMonitor = monitorDao.findById(monitorId).orElse(null);
                    if (mainMonitor == null) {
                        log.warn("No monitor found for id {}", monitorId);
                        continue;
                    }
                    // collector
                    final Optional<CollectorMonitorBind> collectorBind = collectorMonitorBindDao.findCollectorMonitorBindByMonitorId(monitorId);
                    String collector = collectorBind.map(CollectorMonitorBind::getCollector).orElse(null);
                    // params
                    List<Param> mainMonitorParams = paramDao.findParamsByMonitorId(monitorId);
                    final Map<String, MonitorBind> subMonitorBindMap = monitorBindDao.findMonitorBindsByBizId(monitorId)
                            .stream().collect(Collectors.toMap(MonitorBind::getKeyStr, item -> item));
                    RowWrapper rowWrapper = metricsData.readRow();
                    Map<String, String> fieldsValue = Maps.newHashMapWithExpectedSize(8);
                    while (rowWrapper.hasNextRow()) {
                        rowWrapper = rowWrapper.nextRow();
                        fieldsValue.clear();
                        rowWrapper.cellStream().forEach(cell -> {
                            String value = cell.getValue();
                            fieldsValue.put(cell.getField().getName(), value);
                        });
                        final String host = fieldsValue.get(FILED_HOST);
                        final String port = fieldsValue.get(FILED_PORT);
                        final String keyStr = host + ":" + port;
                        if (subMonitorBindMap.containsKey(keyStr)) {
                            subMonitorBindMap.remove(keyStr);
                            continue;
                        }
                        Monitor newMonitor = mainMonitor.clone();
                        newMonitor.setId(null);
                        newMonitor.setHost(host);
                        newMonitor.setName(newMonitor.getName() + "-" + host + ":" + port);
                        newMonitor.setScrape(CommonConstants.SCRAPE_STATIC);
                        newMonitor.setGmtCreate(LocalDateTime.now());
                        newMonitor.setGmtUpdate(LocalDateTime.now());
                        // replace host port
                        List<Param> newParams = new LinkedList<>();
                        for (Param param : mainMonitorParams) {
                            Param newParam = param.clone();
                            newParam.setId(null);
                            newParam.setGmtUpdate(null);
                            newParam.setGmtCreate(null);
                            if (FILED_HOST.equals(newParam.getField())) {
                                newParam.setParamValue(host);
                            } else if (FILED_PORT.equals(newParam.getField())) {
                                newParam.setParamValue(port);
                            }
                            newParams.add(newParam);
                        }
                        monitorService.addMonitor(newMonitor, newParams, collector, null);
                        MonitorBind monitorBind = MonitorBind.builder()
                                .bizId(monitorId)
                                .monitorId(newMonitor.getId())
                                .keyStr(keyStr)
                                .build();
                        monitorBindDao.save(monitorBind);
                    }
                    // hostMonitorMap only contains monitors which are already existed but not in service discovery data
                    // due to monitors that coincide with service discovery data are removed.
                    // Thus, all monitors still in hostMonitorMap need to be cancelled.
                    final Set<Long> needCancelMonitorIdSet = subMonitorBindMap.values().stream()
                            .map(MonitorBind::getMonitorId).collect(Collectors.toSet());
                    monitorService.cancelManageMonitors(needCancelMonitorIdSet);
                    for (Long id : needCancelMonitorIdSet) {
                        monitorBindDao.deleteMonitorBindByBizIdAndMonitorId(monitorId, id);
                    }
                } catch (Exception exception) {
                    log.error(exception.getMessage(), exception);
                }
            }
        }