in agent/pluginmanager/pluginmanager.go [300:444]
func pluginHealthCheckPull() {
pluginHealthCheckTimeMut.Lock()
lastTime := lastPluginHealthCheckTime
pluginHealthCheckTimeMut.Unlock()
now := time.Now().Unix()
needWait := int64(avoidTime) - (now - lastTime)
if needWait > 0 {
log.GetLogger().Infof("pluginHealthCheckPull: last pluginHealthCheckScan started [%d] seconds ago, need wait [%d] second for avoidTime[%d]", now-lastTime, needWait, avoidTime)
time.Sleep(time.Duration(needWait) * time.Second)
now = time.Now().Unix()
}
remainTime := lastTime + int64(pluginHealthScanInterval) - now
if remainTime < int64(avoidTime) {
log.GetLogger().Infof("pluginHealthCheckPull: next pluginHealthCheckScan will start in [%d] seconds, less than avoidTime[%d], so cancel this pluginHealthCheckPull", remainTime, avoidTime)
return
}
log.GetLogger().Info("pluginHealthCheckPull: start")
// 1.检查插件列表,如果没有插件就不需要健康检查
pluginInfoList, err := _findAllInstalledPlugins()
if err != nil {
log.GetLogger().Error("pluginHealthCheckPull: loadPlugins err: " + err.Error())
return
}
if len(pluginInfoList) == 0 {
log.GetLogger().Infof("pluginHealthCheckPull: there is no plugin")
return
}
// 2.获取插件状态
pluginStatusRequest := PluginStatusResquest{
Plugin: []PluginStatus{},
}
pluginDir, err := pathutil.GetPluginPath()
if err != nil {
log.GetLogger().Error("pluginHealthCheckPull: getPluginPath err: ", err.Error())
return
}
curPluginStatusRecord := map[string]string{}
for _, pluginInfo := range pluginInfoList {
if pluginInfo.PluginType() == PLUGIN_PERSIST && !pluginInfo.IsRemoved {
// 常驻型插件且未被删除:检查并读取插件目录下的heartbeat文件
heartbeatPath := filepath.Join(pluginDir, pluginInfo.Name, pluginInfo.Version, "heartbeat")
if fileutil.CheckFileIsExist(heartbeatPath) {
content, err := ioutil.ReadFile(heartbeatPath)
if err != nil {
log.GetLogger().Errorf("pluginHealthCheckPull: Read heartbeat file err, heartbeat[%s], err: %s", heartbeatPath, err.Error())
continue
}
timestampStr := strings.TrimSpace(string(content))
timestamp, err := strconv.ParseInt(timestampStr, 10, 0)
if err != nil {
log.GetLogger().Errorf("pluginHealthCheckPull: Parse heartbeat file err, heartbeat[%s], content[%s] err: %s", heartbeatPath, timestampStr, err.Error())
continue
}
status := PERSIST_RUNNING
if now-timestamp > int64(pluginInfo.HeartbeatInterval+5) {
status = PERSIST_FAIL
}
curPluginStatusRecord[pluginInfo.Name] = status
pluginStatus := PluginStatus{
Name: pluginInfo.Name,
Status: status,
Version: pluginInfo.Version,
}
if len(pluginStatus.Name) > PLUGIN_NAME_MAXLEN {
pluginStatus.Name = pluginStatus.Name[:PLUGIN_NAME_MAXLEN]
}
if len(pluginStatus.Version) > PLUGIN_VERSION_MAXLEN {
pluginStatus.Version = pluginStatus.Version[:PLUGIN_VERSION_MAXLEN]
}
pluginStatusRequest.Plugin = append(pluginStatusRequest.Plugin, pluginStatus)
}
}
}
if len(pluginStatusRequest.Plugin) == 0 {
log.GetLogger().Infof("pluginHealthCheckPull: there is no persist plugin with heartbeat")
return
}
willReport := true
if lazyReport {
// 如果lazyReport为true,对比本次上报的插件状态和上次的插件状态是否一致,如果不一致才上报
willReport = false
// 数量是否一致
if len(curPluginStatusRecord) != len(lastPluginStatusRecord) {
willReport = true
} else {
for k, v := range curPluginStatusRecord {
// 插件项目是否一致
if _, ok := lastPluginStatusRecord[k]; !ok {
willReport = true
break
}
// 同一插件的状态是否一致
if v != lastPluginStatusRecord[k] {
willReport = true
break
}
}
}
}
if !willReport {
return
}
lastPluginStatusRecord = curPluginStatusRecord
requestPayloadBytes, err := json.Marshal(pluginStatusRequest)
if err != nil {
log.GetLogger().WithError(err).Error("pluginHealthCheckPull fail: pluginStatusList marshal fail")
return
}
requestPayload := string(requestPayloadBytes)
url := util.GetPluginHealthService()
resp, err := util.HttpPost(url, requestPayload, "")
if err != nil {
log.GetLogger().WithError(err).Error("pluginHealthCheckPull fail: post pluginStatusList fail")
return
}
pluginStatusResp, err := parsePluginHealthCheck(resp)
if err != nil {
log.GetLogger().WithError(err).Errorf("pluginHealthCheckPull fail: parse PluginStatusResponse from resp fail: %s", resp)
return
}
// 设置下次状态检查周期
if pluginStatusResp.PullInterval > 0 {
pluginHealthPullInterval = pluginStatusResp.PullInterval
}
if err := refreshTimer(pluginHealthPullTimer, pluginHealthPullInterval); err != nil {
log.GetLogger().Errorf("pluginHealthCheckPull: refresh pluginHealthPullTimer nextInterval [%d] second failed: %s", pluginHealthPullInterval, err.Error())
} else {
log.GetLogger().Infof("pluginHealthCheckPull: refresh pluginHealthPullTimer nextInterval [%d] second", pluginHealthPullInterval)
}
if pluginStatusResp.ScanInterval > 0 {
pluginHealthScanInterval = pluginStatusResp.ScanInterval
}
// pluginStatusResp.ReportType 代表是否开启懒上报
if pluginStatusResp.ReportType == NORMAL_REPORT && lazyReport {
lazyReport = false
log.GetLogger().Info("pluginHealthCheckPull: lazyReport switch to [off]")
} else if pluginStatusResp.ReportType == LAZY_REPORT && !lazyReport {
lazyReport = true
log.GetLogger().Info("pluginHealthCheckPull: lazyReport switch to [on]")
}
log.GetLogger().Info("pluginHealthCheckPull success")
}