func pluginHealthCheckPull()

in agent/pluginmanager/pluginmanager.go [300:444]


func pluginHealthCheckPull() {
	pluginHealthCheckTimeMut.Lock()
	lastTime := lastPluginHealthCheckTime
	pluginHealthCheckTimeMut.Unlock()
	now := time.Now().Unix()
	needWait := int64(avoidTime) - (now - lastTime)
	if needWait > 0 {
		log.GetLogger().Infof("pluginHealthCheckPull: last pluginHealthCheckScan started [%d] seconds ago, need wait [%d] second for avoidTime[%d]", now-lastTime, needWait, avoidTime)
		time.Sleep(time.Duration(needWait) * time.Second)
		now = time.Now().Unix()
	}
	remainTime := lastTime + int64(pluginHealthScanInterval) - now
	if remainTime < int64(avoidTime) {
		log.GetLogger().Infof("pluginHealthCheckPull: next pluginHealthCheckScan will start in [%d] seconds, less than avoidTime[%d], so cancel this pluginHealthCheckPull", remainTime, avoidTime)
		return
	}
	log.GetLogger().Info("pluginHealthCheckPull: start")
	// 1.检查插件列表,如果没有插件就不需要健康检查
	pluginInfoList, err := _findAllInstalledPlugins()
	if err != nil {
		log.GetLogger().Error("pluginHealthCheckPull: loadPlugins err: " + err.Error())
		return
	}
	if len(pluginInfoList) == 0 {
		log.GetLogger().Infof("pluginHealthCheckPull: there is no plugin")
		return
	}

	// 2.获取插件状态
	pluginStatusRequest := PluginStatusResquest{
		Plugin: []PluginStatus{},
	}
	pluginDir, err := pathutil.GetPluginPath()
	if err != nil {
		log.GetLogger().Error("pluginHealthCheckPull: getPluginPath err: ", err.Error())
		return
	}
	curPluginStatusRecord := map[string]string{}
	for _, pluginInfo := range pluginInfoList {
		if pluginInfo.PluginType() == PLUGIN_PERSIST && !pluginInfo.IsRemoved {
			// 常驻型插件且未被删除:检查并读取插件目录下的heartbeat文件
			heartbeatPath := filepath.Join(pluginDir, pluginInfo.Name, pluginInfo.Version, "heartbeat")
			if fileutil.CheckFileIsExist(heartbeatPath) {
				content, err := ioutil.ReadFile(heartbeatPath)
				if err != nil {
					log.GetLogger().Errorf("pluginHealthCheckPull: Read heartbeat file err, heartbeat[%s], err: %s", heartbeatPath, err.Error())
					continue
				}
				timestampStr := strings.TrimSpace(string(content))
				timestamp, err := strconv.ParseInt(timestampStr, 10, 0)
				if err != nil {
					log.GetLogger().Errorf("pluginHealthCheckPull: Parse heartbeat file err, heartbeat[%s], content[%s] err: %s", heartbeatPath, timestampStr, err.Error())
					continue
				}
				status := PERSIST_RUNNING
				if now-timestamp > int64(pluginInfo.HeartbeatInterval+5) {
					status = PERSIST_FAIL
				}
				curPluginStatusRecord[pluginInfo.Name] = status
				pluginStatus := PluginStatus{
					Name:    pluginInfo.Name,
					Status:  status,
					Version: pluginInfo.Version,
				}
				if len(pluginStatus.Name) > PLUGIN_NAME_MAXLEN {
					pluginStatus.Name = pluginStatus.Name[:PLUGIN_NAME_MAXLEN]
				}
				if len(pluginStatus.Version) > PLUGIN_VERSION_MAXLEN {
					pluginStatus.Version = pluginStatus.Version[:PLUGIN_VERSION_MAXLEN]
				}
				pluginStatusRequest.Plugin = append(pluginStatusRequest.Plugin, pluginStatus)
			}
		}
	}
	if len(pluginStatusRequest.Plugin) == 0 {
		log.GetLogger().Infof("pluginHealthCheckPull: there is no persist plugin with heartbeat")
		return
	}
	willReport := true
	if lazyReport {
		// 如果lazyReport为true,对比本次上报的插件状态和上次的插件状态是否一致,如果不一致才上报
		willReport = false
		// 数量是否一致
		if len(curPluginStatusRecord) != len(lastPluginStatusRecord) {
			willReport = true
		} else {
			for k, v := range curPluginStatusRecord {
				// 插件项目是否一致
				if _, ok := lastPluginStatusRecord[k]; !ok {
					willReport = true
					break
				}
				// 同一插件的状态是否一致
				if v != lastPluginStatusRecord[k] {
					willReport = true
					break
				}
			}
		}
	}

	if !willReport {
		return
	}
	lastPluginStatusRecord = curPluginStatusRecord
	requestPayloadBytes, err := json.Marshal(pluginStatusRequest)
	if err != nil {
		log.GetLogger().WithError(err).Error("pluginHealthCheckPull fail: pluginStatusList marshal fail")
		return
	}
	requestPayload := string(requestPayloadBytes)
	url := util.GetPluginHealthService()
	resp, err := util.HttpPost(url, requestPayload, "")
	if err != nil {
		log.GetLogger().WithError(err).Error("pluginHealthCheckPull fail: post pluginStatusList fail")
		return
	}
	pluginStatusResp, err := parsePluginHealthCheck(resp)
	if err != nil {
		log.GetLogger().WithError(err).Errorf("pluginHealthCheckPull fail: parse PluginStatusResponse from resp fail: %s", resp)
		return
	}

	// 设置下次状态检查周期
	if pluginStatusResp.PullInterval > 0 {
		pluginHealthPullInterval = pluginStatusResp.PullInterval
	}
	if err := refreshTimer(pluginHealthPullTimer, pluginHealthPullInterval); err != nil {
		log.GetLogger().Errorf("pluginHealthCheckPull: refresh pluginHealthPullTimer nextInterval [%d] second failed: %s", pluginHealthPullInterval, err.Error())
	} else {
		log.GetLogger().Infof("pluginHealthCheckPull: refresh pluginHealthPullTimer nextInterval [%d] second", pluginHealthPullInterval)
	}
	if pluginStatusResp.ScanInterval > 0 {
		pluginHealthScanInterval = pluginStatusResp.ScanInterval
	}
	// pluginStatusResp.ReportType 代表是否开启懒上报
	if pluginStatusResp.ReportType == NORMAL_REPORT && lazyReport {
		lazyReport = false
		log.GetLogger().Info("pluginHealthCheckPull: lazyReport switch to [off]")
	} else if pluginStatusResp.ReportType == LAZY_REPORT && !lazyReport {
		lazyReport = true
		log.GetLogger().Info("pluginHealthCheckPull: lazyReport switch to [on]")
	}
	log.GetLogger().Info("pluginHealthCheckPull success")
}