in agent/pluginmanager/pluginmanager.go [137:298]
func pluginHealthCheckScan() {
pluginHealthCheckTimeMut.Lock()
lastPluginHealthCheckTime = time.Now().Unix()
pluginHealthCheckTimeMut.Unlock()
log.GetLogger().Info("pluginHealthCheckScan: start")
// 1.检查插件列表,如果没有插件就不需要健康检查
pluginInfoList, err := _findAllInstalledPlugins()
if err != nil {
log.GetLogger().WithError(err).Error("pluginHealthCheckScan: loadPlugins err: " + err.Error())
return
}
if len(pluginInfoList) == 0 {
log.GetLogger().Infof("pluginHealthCheckScan: there is no plugin")
return
}
// 2.将插件状态发送给服务端
pluginStatusRequest := PluginStatusResquest{
Plugin: []PluginStatus{},
}
persistPluginCount := 0
pluginInfoMap := make(map[string]*PluginInfo)
for _, pluginInfo := range pluginInfoList {
if pluginInfo.IsRemoved {
continue
}
pluginInfoMap[pluginInfo.Name] = &pluginInfo
if pluginInfo.PluginType() == PLUGIN_ONCE {
pluginStatus := PluginStatus{
Name: pluginInfo.Name,
Status: ONCE_INSTALLED,
Version: pluginInfo.Version,
}
// 太长的名称和版本号字段进行截断
if len(pluginStatus.Name) > PLUGIN_NAME_MAXLEN {
pluginStatus.Name = pluginStatus.Name[:PLUGIN_NAME_MAXLEN]
}
if len(pluginStatus.Version) > PLUGIN_VERSION_MAXLEN {
pluginStatus.Version = pluginStatus.Version[:PLUGIN_VERSION_MAXLEN]
}
pluginStatusRequest.Plugin = append(pluginStatusRequest.Plugin, pluginStatus)
} else if pluginInfo.PluginType() == PLUGIN_PERSIST {
persistPluginCount += 1
}
}
if persistPluginCount > 0 {
// 调用acs-plugin-manager模块的 status接口,批量获取常驻插件状态(包括已删除的常驻插件)
mixedOutput := bytes.Buffer{}
cmd := "acs-plugin-manager"
arguments := []string{"--status"}
_, _, err = syncRunKillGroup("", cmd, arguments, &mixedOutput, &mixedOutput, 120)
if err != nil {
log.GetLogger().Errorf("pluginHealthCheckScan: cmd run err: %s, cmd[%s %s] output[%s]", err.Error(), cmd, strings.Join(arguments, " "), mixedOutput.String())
return
}
content := mixedOutput.Bytes()
pluginStatusList := []PluginStatus{}
if err := json.Unmarshal(content, &pluginStatusList); err != nil {
log.GetLogger().Errorf("pluginHealthCheckScan: json.Unmarshal pluginStatusList error: %s, content: %s", err.Error(), string(content))
}
if len(pluginStatusList) == 0 {
log.GetLogger().Infof("pluginHealthCheckScan: there is no persist plugin, content[%s]", string(content))
}
for _, pluginInfo := range pluginStatusList {
if pluginInfo.Status == REMOVED {
continue
}
pluginStatus := PluginStatus{
Name: pluginInfo.Name,
Version: pluginInfo.Version,
Status: pluginInfo.Status,
}
// 太长的名称和版本号字段进行截断
if len(pluginStatus.Name) > PLUGIN_NAME_MAXLEN {
pluginStatus.Name = pluginStatus.Name[:PLUGIN_NAME_MAXLEN]
}
if len(pluginStatus.Version) > PLUGIN_VERSION_MAXLEN {
pluginStatus.Version = pluginStatus.Version[:PLUGIN_VERSION_MAXLEN]
}
if pluginInfo.Status != PERSIST_RUNNING && pluginInfo.Status != REMOVED {
// // 状态异常的常驻插件本次不上报,acs-plugin-manager调用--start拉起后会单独上报该插件的状态
log.GetLogger().Warnf("plugin[%s] is not running, try to start it", pluginInfo.Name)
go func(pluginName string, mp map[string]*PluginInfo) {
randSleep := rand.Intn(10 * 1000)
time.Sleep(time.Duration(randSleep) * time.Millisecond)
command := "acs-plugin-manager"
arguments := []string{"-e", "--local", "-P", pluginName, "-p", "--start"}
timeout := 60
if pluginInfoPtr, ok := mp[pluginName]; ok && pluginInfoPtr.Timeout != "" {
if t, err := strconv.Atoi(pluginInfoPtr.Timeout); err == nil {
timeout = t
}
}
syncRunKillGroup("", command, arguments, nil, nil, timeout)
}(pluginInfo.Name, pluginInfoMap)
} else {
// 状态正常的常驻插件进行上报
pluginStatusRequest.Plugin = append(pluginStatusRequest.Plugin, pluginStatus)
}
}
}
if len(pluginStatusRequest.Plugin) == 0 {
log.GetLogger().Infof("pluginHealthCheckScan: there is no plugin need report status")
return
}
requestPayloadBytes, err := json.Marshal(pluginStatusRequest)
if err != nil {
log.GetLogger().WithError(err).Error("pluginHealthCheckScan: pluginStatusList marshal err: " + err.Error())
return
}
requestPayload := string(requestPayloadBytes)
url := util.GetPluginHealthService()
resp, err := util.HttpPost(url, requestPayload, "")
for i := 0; i < 3 && err != nil; i++ {
log.GetLogger().Infof("pluginHealthCheckScan: upload pluginStatusList fail, need retry: %s", requestPayload)
time.Sleep(time.Duration(2) * time.Second)
resp, err = util.HttpPost(url, requestPayload, "")
}
if err != nil {
log.GetLogger().WithError(err).Error("pluginHealthCheckScan: post pluginStatusList fail")
return
}
pluginStatusResp, err := parsePluginHealthCheck(resp)
if err != nil {
log.GetLogger().WithError(err).Errorf("pluginHealthCheckScan: parse PluginStatusResponse from resp fail: %s", resp)
return
}
// 设置下次状态检查周期
if pluginStatusResp.PullInterval > 0 && pluginStatusResp.PullInterval != pluginHealthPullInterval {
pluginHealthPullInterval = pluginStatusResp.PullInterval
}
if pluginStatusResp.ScanInterval > 0 && pluginStatusResp.ScanInterval != pluginHealthScanInterval {
pluginHealthScanInterval = pluginStatusResp.ScanInterval
}
if err := refreshTimer(pluginHealthScanTimer, pluginHealthScanInterval); err != nil {
log.GetLogger().Errorf("pluginHealthCheckScan: refresh pluginHealthScanTimer nextInterval [%d] second failed: %s", pluginHealthScanInterval, err.Error())
} else {
log.GetLogger().Infof("pluginHealthCheckScan: refresh pluginHealthScanTimer nextInterval [%d] second", pluginHealthScanInterval)
}
if pluginStatusResp.ReportType == NORMAL_REPORT && lazyReport {
lazyReport = false
log.GetLogger().Info("pluginHealthCheckScan: lazyReport switch to [off]")
} else if pluginStatusResp.ReportType == LAZY_REPORT && !lazyReport {
lazyReport = true
log.GetLogger().Info("pluginHealthCheckScan: lazyReport switch to [on]")
}
// if flowReport {
// // 有拉起插件的动作,需要重置pluginHealthPullTimer以便及时向服务端更新拉起后的状态
// // 但是如果interval太晚(晚于pluginHealthPullInterval 或者 pluginHealthScanInterval)就不需要重置pluginHealthPullTimer了
// interval := 60
// if pluginStatusResp.RefreshInterval > 0 {
// interval = pluginStatusResp.RefreshInterval
// }
// if interval < pluginHealthPullInterval && interval < pluginHealthScanInterval {
// pluginHealthPullTimer.Reset(time.Duration(interval) * time.Second)
// }
// }
log.GetLogger().Info("pluginHealthCheckScan success")
}