public void collectorGoOnline()

in hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java [96:176]


    public void collectorGoOnline(String identity, CollectorInfo collectorInfo) {
        if (StringUtils.isBlank(identity)) {
            log.error("identity can not be null if collector not existed");
            return;
        }
        Collector collector = collectorDao.findCollectorByName(identity).orElse(null);
        if (Objects.nonNull(collector)) {
            if (collector.getStatus() == CommonConstants.COLLECTOR_STATUS_ONLINE) {
                return;
            }
            collector.setStatus(CommonConstants.COLLECTOR_STATUS_ONLINE);
            if (collectorInfo != null) {
                collector.setIp(collectorInfo.getIp());
                collector.setMode(collectorInfo.getMode());
                collector.setVersion(collectorInfo.getVersion());
            }
        } else {
            if (collectorInfo == null) {
                log.error("collectorInfo can not null when collector not existed");
                return;
            }
            collector = Collector.builder()
                    .name(identity)
                    .ip(collectorInfo.getIp())
                    .mode(collectorInfo.getMode())
                    .version(collectorInfo.getVersion())
                    .status(CommonConstants.COLLECTOR_STATUS_ONLINE)
                    .build();
        }
        collectorDao.save(collector);
        ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(),
                collector.getIp(), System.currentTimeMillis(), null);
        consistentHash.addNode(node);
        reBalanceCollectorAssignJobs();
        // Read database The fixed collection tasks at this collector are delivered
        List<CollectorMonitorBind> binds = collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity);
        if (CollectionUtils.isEmpty(binds)){
            return;
        }
        List<Monitor> monitors = monitorDao.findMonitorsByIdIn(binds.stream().map(CollectorMonitorBind::getMonitorId).collect(Collectors.toSet()));
        for (Monitor monitor : monitors) {
            if (Objects.isNull(monitor) || monitor.getStatus() == CommonConstants.MONITOR_PAUSED_CODE) {
                continue;
            }
            try {
                // build collect job entity
                Job appDefine = appService.getAppDefine(monitor.getApp());
                if (CommonConstants.PROMETHEUS.equals(monitor.getApp())) {
                    appDefine.setApp(CommonConstants.PROMETHEUS_APP_PREFIX + monitor.getName());
                }
                appDefine.setMonitorId(monitor.getId());
                appDefine.setDefaultInterval(monitor.getIntervals());
                appDefine.setCyclic(true);
                appDefine.setTimestamp(System.currentTimeMillis());
                List<Param> params = paramDao.findParamsByMonitorId(monitor.getId());
                List<Configmap> configmaps = params.stream()
                        .map(param -> Configmap.builder()
                                        .key(param.getField())
                                        .value(param.getParamValue())
                                        .type(param.getType()).build()).collect(Collectors.toList());
                List<ParamDefine> paramDefaultValue = appDefine.getParams().stream()
                        .filter(item -> StringUtils.isNotBlank(item.getDefaultValue()))
                        .toList();
                paramDefaultValue.forEach(defaultVar -> {
                    if (configmaps.stream().noneMatch(item -> item.getKey().equals(defaultVar.getField()))) {
                        configmaps.add(Configmap.builder()
                                .key(defaultVar.getField())
                                .value(defaultVar.getDefaultValue())
                                .type((byte) 1)
                                .build());
                    }
                });
                appDefine.setConfigmap(configmaps);
                long jobId = addAsyncCollectJob(appDefine, identity);
                monitor.setJobId(jobId);
                monitorDao.save(monitor);
            } catch (Exception e) {
                log.error("insert pinned monitor job: {} in collector: {} error,continue next monitor", monitor, identity, e);
            }
        }
    }