agent/pluginmanager/acspluginmanager/acspluginmanager.go (1,192 lines of code) (raw):

package acspluginmanager import ( "archive/zip" "context" "errors" "fmt" "os" "path/filepath" "strconv" "strings" "time" "github.com/rodaine/table" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/metrics" . "github.com/aliyun/aliyun_assist_client/agent/pluginmanager" "github.com/aliyun/aliyun_assist_client/agent/util" "github.com/aliyun/aliyun_assist_client/agent/util/osutil" "github.com/aliyun/aliyun_assist_client/agent/util/process" "github.com/aliyun/aliyun_assist_client/agent/util/versionutil" "github.com/aliyun/aliyun_assist_client/common/filelock" "github.com/aliyun/aliyun_assist_client/common/fileutil" "github.com/aliyun/aliyun_assist_client/common/fuzzyjson" "github.com/aliyun/aliyun_assist_client/common/pathutil" "github.com/aliyun/aliyun_assist_client/common/zipfile" ) type pluginConfig struct { Name string `json:"name"` Arch string `json:"arch"` OsType string `json:"osType"` RunPath string `json:"runPath"` Timeout string `json:"timeout"` Publisher string `json:"publisher"` Version string `json:"version"` PluginType_ interface{} `json:"pluginType"` HeartbeatInterval int `json:"heartbeatInterval"` AddSysTag bool `json:"addSysTag"` pluginTypeStr string } func (pc *pluginConfig) PluginType() string { if pc.pluginTypeStr == "" { switch pc.PluginType_.(type) { case string: pt, _ := pc.PluginType_.(string) if pt == PLUGIN_ONCE { pc.pluginTypeStr = PLUGIN_ONCE } else if pt == PLUGIN_PERSIST { pc.pluginTypeStr = PLUGIN_PERSIST } else if pt == PLUGIN_COMMANDER { pc.pluginTypeStr = PLUGIN_COMMANDER } else { pc.pluginTypeStr = PLUGIN_UNKNOWN } case float64: pt, _ := pc.PluginType_.(float64) if pt == float64(PLUGIN_ONCE_INT) { pc.pluginTypeStr = PLUGIN_ONCE } else if pt == float64(PLUGIN_PERSIST_INT) { pc.pluginTypeStr = PLUGIN_PERSIST } else if pt == float64(PLUGIN_COMMANDER_INT) { pc.pluginTypeStr = PLUGIN_COMMANDER } else { pc.pluginTypeStr = PLUGIN_UNKNOWN } case nil: pc.pluginTypeStr = PLUGIN_ONCE default: pc.pluginTypeStr = PLUGIN_UNKNOWN } } return pc.pluginTypeStr } type PluginManager struct { Verbose bool Yes bool pluginRoot string } func NewPluginManager(verbose bool) (*PluginManager, error) { pluginRoot, err := pathutil.GetPluginPath() if err != nil { return nil, err } return &PluginManager{ Verbose: verbose, Yes: true, pluginRoot: pluginRoot, }, nil } func printPluginInfo(pluginInfoList *[]PluginInfo) { tbl := table.New("Name", "Version", "Publisher", "OsType", "Arch", "PluginType") for i := 0; i < len(*pluginInfoList); i++ { pluginInfo := (*pluginInfoList)[i] // 已删除的插件不打印 if pluginInfo.IsRemoved { continue } tbl.AddRow(pluginInfo.Name, pluginInfo.Version, pluginInfo.Publisher, pluginInfo.OSType, pluginInfo.Arch, pluginInfo.PluginType()) } tbl.Print() fmt.Println() } func getOnlinePluginInfo(packageName, version string) (archMatch *PluginInfo, archNotMatch []string, err error) { // request all arch pluginInfos var pluginList []PluginInfo pluginList, err = FetchPackageInfo(log.GetLogger(), packageName, version, false) if err != nil { return nil, nil, err } localArch, _ := GetArch() for idx, plugin := range pluginList { if plugin.Name == packageName { plugin.Arch = strings.ToLower(plugin.Arch) if plugin.Arch == "" || plugin.Arch == "all" || localArch == plugin.Arch { if archMatch == nil { archMatch = &pluginList[idx] // if plugin.Version > archMatch.Version, update archMatch } else if versionutil.CompareVersion(plugin.Version, archMatch.Version) > 0 { archMatch = &pluginList[idx] } } else { archNotMatch = append(archNotMatch, plugin.Arch) } } } return } func (pm *PluginManager) List(pluginName string, local bool) (exitCode int, err error) { var pluginInfoList []PluginInfo funcName := "List" exitCode = SUCCESS if local { if pluginName != "" { pluginInfoList, err = getInstalledPluginsByName(pluginName) } else { pluginInfoList, err = getAllInstalledPlugins() } if err != nil { exitCode, _ = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error()) return } } else { // just request pluginInfos with right arch pluginInfoList, err = FetchPackageInfo(log.GetLogger(), pluginName, "", true) if err != nil { exitCode, _ = errProcess(funcName, GET_ONLINE_PACKAGE_INFO_ERR, err, "Get plugin info from online err: "+err.Error()) return } } printPluginInfo(&pluginInfoList) return } // 打印常驻插件的状态,包括已删除的常驻插件 func (pm *PluginManager) ShowPluginStatus() (exitCode int, err error) { log.GetLogger().Infoln("Enter showPluginStatus") funcName := "ShowPluginStatus" exitCode = SUCCESS pluginList, err := getAllInstalledPlugins() if err != nil { exitCode, _ = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error()) return } log.GetLogger().Infof("Count of installed plugins: %d", len(pluginList)) statusList := []PluginStatus{} paramList := []string{"--status"} for _, plugin := range pluginList { timeout := 60 code := 0 if t, err := strconv.ParseInt(plugin.Timeout, 10, 0); err == nil { timeout = int(t) } if plugin.PluginType() == PLUGIN_PERSIST { status := PluginStatus{ Name: plugin.Name, Version: plugin.Version, Status: PERSIST_FAIL, } if plugin.IsRemoved { status.Status = REMOVED } else { pluginDir := filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version) env := []string{ "PLUGIN_DIR=" + pluginDir, } cmdPath := filepath.Join(pluginDir, plugin.RunPath) code, _, err = pm.executePlugin(cmdPath, paramList, timeout, env, true) if code == 0 && err == nil { status.Status = PERSIST_RUNNING } if err != nil { log.GetLogger().Errorf("ShowPluginStatus: executePlugin err, pluginName[%s] pluginVersion[%s]", plugin.Name, plugin.Version) } } statusList = append(statusList, status) } } content, err := fuzzyjson.Marshal(&statusList) if err != nil { log.GetLogger().Error("ShowPluginStatus err when marshal statusList, err: ", err.Error()) } fmt.Println(content) return } func (pm *PluginManager) ExecutePlugin(fetchOptions *ExecFetchOptions, executeParams *ExecuteParams) (exitCode int, err error) { log.GetLogger().Infoln("Enter ExecutePlugin") if pm.Verbose { log.GetLogger().WithFields(log.Fields{ "fetchOptions": fetchOptions, "executeParams": executeParams, }).Infof("ExecutePlugin") } if fetchOptions.File != "" { return pm.executePluginFromFile(fetchOptions.File, fetchOptions.FetchTimeoutInSeconds, executeParams) } // execute plugin exe-file return pm.executePluginOnlineOrLocal(fetchOptions, executeParams) } // 根据插件名称删除插件,会删除该插件的整个目录(包括其中各版本的目录) // 一次性插件:直接删除相应的目录并将installed_plugins中对应的插件标记为已删除(isRemoved=true) // 常驻型插件:删除之前先调用插件的 --stop和 --uninstall,如果--uninstall退出码非0则不删除,否则像一次性插件一样删除目录并标记 func (pm *PluginManager) RemovePlugin(pluginName string) (exitCode int, err error) { defer func() { if exitCode != 0 || err != nil { fmt.Printf("RemovePlugin error, plugin[%s], err: %v\n", pluginName, err) } else { fmt.Printf("RemovePlugin success, plugin[%s]\n", pluginName) } }() const funcName = "RemovePlugin" idx, pluginInfo, err := getInstalledPluginNotRemovedByName(pluginName) if err != nil { exitCode, _ = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error()) return } if pluginInfo == nil { exitCode, _ = errProcess(funcName, PACKAGE_NOT_FOUND, err, "plugin not exist "+pluginName) err = errors.New("Plugin " + pluginName + " not found in installed_plugins") return } var pluginLockFile *os.File pluginLockFile, err = openPluginLockFile(pluginName) if err != nil { err = NewOpenPluginLockFileError(err) exitCode, _ = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to remove plugin[%s]: %s", pluginName, err.Error())) return } defer pluginLockFile.Close() if err = filelock.TryLock(pluginLockFile); err != nil { err = NewAcquirePluginExclusiveLockError(err) exitCode, _ = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to remove plugin[%s]: %s", pluginName, err.Error())) return } defer filelock.Unlock(pluginLockFile) if pluginInfo.PluginType() == PLUGIN_PERSIST { // 常驻型插件 var ( envPluginDir string envPrePluginDir string ) cmdPath := filepath.Join(pm.pluginRoot, pluginInfo.Name, pluginInfo.Version, pluginInfo.RunPath) envPluginDir = filepath.Join(pm.pluginRoot, pluginInfo.Name, pluginInfo.Version) var timeout int if timeout, err = strconv.Atoi(pluginInfo.Timeout); err != nil { timeout = 60 } env := []string{ "PLUGIN_DIR=" + envPluginDir, "PRE_PLUGIN_DIR=" + envPrePluginDir, } // --stop 停止插件进程 paramList := []string{"--stop"} pm.executePlugin(cmdPath, paramList, timeout, env, false) // --uninstall 卸载插件服务 paramList = []string{"--uninstall"} exitCode, _, err = pm.executePlugin(cmdPath, paramList, timeout, env, false) if exitCode != 0 || err != nil { return } } pluginInfo.IsRemoved = true // 标记为已删除 // 更新installed_plugins文件 if err = updateInstalledPlugin(idx, pluginInfo); err != nil { exitCode, _ = errProcess(funcName, DUMP_INSTALLEDPLUGINS_ERR, err, "Update installed_plugins file err: "+err.Error()) return } sysTagType := "" if pluginInfo.AddSysTag { sysTagType = RemoveSysTag } if err = pm.ReportPluginStatus(pluginInfo.Name, pluginInfo.Version, REMOVED, sysTagType); err != nil { log.GetLogger().Errorf("Plugin[%s] is removed, but report the removed plugin to server error: %s", pluginInfo.Name, err.Error()) } // 删除插件目录 pluginDir := filepath.Join(pm.pluginRoot, pluginInfo.Name) if err = os.RemoveAll(pluginDir); err != nil { exitCode, _ = errProcess(funcName, REMOVE_FILE_ERR, err, fmt.Sprintf("Remove plugin directory err, pluginDir[%s], err: %s", pluginDir, err.Error())) return } return } // run plugin from plugin_file.zip func (pm *PluginManager) executePluginFromFile(packagePath string, fetchTimeoutInSeconds int, executeParams *ExecuteParams) (exitCode int, err error) { const funcName = "ExecutePluginFromFile" log.GetLogger().Infof("Enter executePluginFromFile") var ( // variables for metrics pluginName string pluginVersion string resource string = FetchFromLocalFile errorCode string localArch string pluginType string // 执行插件时要注入的环境变量 envPrePluginDir string // 如果已有同名的其他版本插件,表示原有同名插件的执行目录;否则为空 ) defer func() { metrics.GetPluginExecuteEvent( "pluginName", pluginName, "pluginVersion", pluginVersion, "pluginType", pluginType, "resource", resource, // 插件来源,文件、本地已安装的、线上拉取的 "exitCode", fmt.Sprint(exitCode), "errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码 "localArch", localArch, "localOsType", osutil.GetOsType(), ).ReportEventSync() }() if pm.Verbose { fmt.Println("Execute plugin from file: ", packagePath) } localArch, _ = GetArch() exitCode = SUCCESS if !fileutil.CheckFileIsExist(packagePath) { err = errors.New("File not exist: " + packagePath) exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, "Package file not exist: "+packagePath) return } const compressedConfigPath = "config.json" var configBody []byte configBody, err = zipfile.PeekFile(packagePath, compressedConfigPath) if err != nil { if errors.Is(err, zip.ErrFormat) { errorMessage := "Package file isn`t a zip file: " + packagePath err = errors.New(errorMessage) exitCode, errorCode = errProcess(funcName, PACKAGE_FORMAT_ERR, err, errorMessage) return } else if errors.Is(err, os.ErrNotExist) { errorMessage := fmt.Sprintf("Manifest file config.json does not exist in %s.", packagePath) err = errors.New(errorMessage) exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, errorMessage) return } else { exitCode, errorCode = errProcess(funcName, UNZIP_ERR, err, fmt.Sprintf("Unzip err, file is [%s], target file is [%s], err is [%s]", packagePath, compressedConfigPath, err.Error())) return } } configContent := string(configBody) config := pluginConfig{} if err = fuzzyjson.Unmarshal(configContent, &config); err != nil { exitCode, errorCode = errProcess(funcName, UNMARSHAL_ERR, err, fmt.Sprintf("Unmarshal config.json err, config.json is [%s], err is [%s]", configContent, err.Error())) return } pluginName = config.Name pluginVersion = config.Version // 检查系统类型和架构是否符合 if config.OsType != "" && strings.ToLower(config.OsType) != osutil.GetOsType() { err = errors.New("Plugin ostype not suit for this system") exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin ostype[%s] not suit for this system[%s]", config.OsType, osutil.GetOsType())) return } if config.Arch != "" && strings.ToLower(config.Arch) != "all" && strings.ToLower(config.Arch) != localArch { err = errors.New("Plugin arch not suit for this system") exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin arch[%s] not suit for this system[%s]", config.Arch, localArch)) return } pluginIndex, plugin, err := getInstalledPluginByName(config.Name) if err != nil { exitCode, errorCode = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error()) return } if plugin != nil && plugin.IsRemoved { // 之前的同名插件已经被删除,相当于重新安装 if err = deleteInstalledPluginByIndex(pluginIndex); err != nil { exitCode, errorCode = errProcess(funcName, DUMP_INSTALLEDPLUGINS_ERR, err, "Update installed_plugins file err: "+err.Error()) return } pluginIndex = -1 plugin = nil } if plugin != nil { envPrePluginDir = filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version) // has installed, check version if versionutil.CompareVersion(config.Version, plugin.Version) <= 0 { if !pm.Yes { yn := "" for { fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install ? [y/n]: \n", plugin.Name, plugin.Version, config.Version) fmt.Scanln(&yn) if yn == "y" || yn == "n" { break } } if yn == "n" { log.GetLogger().Infoln("Execute plugin cancel...") fmt.Println("Execute plugin cancel...") return } } fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install...\n", plugin.Name, plugin.Version, config.Version) } else { fmt.Printf("[%s %s] has installed, this package version[%s] is newer, keep install...\n", plugin.Name, plugin.Version, config.Version) } } // Acquire plugin-wise shared lock var pluginLockFile *os.File pluginLockFile, err = openPluginLockFile(config.Name) if err != nil { err = NewOpenPluginLockFileError(err) exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error())) return } defer pluginLockFile.Close() if err = filelock.TryRLock(pluginLockFile); err != nil { err = NewAcquirePluginExclusiveLockError(err) exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error())) return } defer filelock.Unlock(pluginLockFile) var fetched *Fetched var exitingErr ExitingError fetchTimeout := time.Duration(fetchTimeoutInSeconds) * time.Second guardErr := InstallLockGuard{ PluginName: config.Name, PluginVersion: config.Version, AcquireSharedLockTimeout: fetchTimeout, }.Do(func() { // Double-check if specified plugin has been installed by another // plugin-manager process, that releases plugin-version-wise // exclusive lock and then this acquired. fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version) if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) { return } fetched, exitingErr = pm.installFromFile(packagePath, &config, plugin, pluginIndex, fetchTimeout) }, func() { fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version) // TODO-FIXME: envPrePluginDir SHOULD be constructed on demand only fetched.EnvPrePluginDir = envPrePluginDir }) if guardErr != nil { err = guardErr exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error())) return } if exitingErr != nil { err = exitingErr.Unwrap() exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error()) return } fmt.Printf("Plugin[%s] installed!\n", fetched.PluginName) pluginName = fetched.PluginName pluginVersion = fetched.PluginVersion pluginType = fetched.PluginType executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds if executeParams.OptionalExecutionTimeoutInSeconds != nil { executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds } args := executeParams.SplitArgs() env := []string{ "PLUGIN_DIR=" + fetched.EnvPluginDir, "PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir, } var options []process.CmdOption options, err = prepareCmdOptions(executeParams) if err != nil { exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error())) return } exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...) // 常驻插件和一次性插件:需要主动上报一次插件状态 if pluginType == PLUGIN_PERSIST { status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, "") log.GetLogger().WithFields(log.Fields{ "pluginName": pluginName, "pluginVersion": pluginVersion, "cmdPath": fetched.Entrypoint, "timeout": executionTimeoutInSeconds, "env": env, "status": status, }).WithError(err).Infof("CheckAndReportPlugin") } else if pluginType == PLUGIN_ONCE { pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, "") } return } func (pm *PluginManager) installFromFile(packagePath string, config *pluginConfig, plugin *PluginInfo, pluginIndex int, timeout time.Duration) (*Fetched, ExitingError) { ctx := context.Background() var cancel context.CancelFunc if timeout > 0 { ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } var envPrePluginDir string if pluginIndex == -1 { plugin = &PluginInfo{ Timeout: "60", } } else { envPrePluginDir = filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version) } executionTimeoutInSeconds := 60 if t, err := strconv.Atoi(config.Timeout); err != nil { config.Timeout = plugin.Timeout } else { executionTimeoutInSeconds = t } plugin.Name = config.Name plugin.Arch = config.Arch plugin.OSType = config.OsType plugin.RunPath = config.RunPath plugin.Timeout = config.Timeout plugin.Publisher = config.Publisher plugin.Version = config.Version plugin.AddSysTag = config.AddSysTag plugin.SetPluginType(config.PluginType()) plugin.Url = "local" if config.HeartbeatInterval <= 0 { plugin.HeartbeatInterval = 60 } else { plugin.HeartbeatInterval = config.HeartbeatInterval } // TODO-FIXME: Calculating the MD5 checksum for plugin package is also a // time-consuming procedure, which should be cancelable when timed out md5Checksum, err := util.ComputeMd5(packagePath) if err != nil { return nil, NewMD5CheckExitingError(err, "Compute md5 of plugin file err: "+err.Error()) } plugin.Md5 = md5Checksum pluginPath := filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version) pathutil.MakeSurePath(pluginPath) if err := zipfile.UnzipContext(ctx, packagePath, pluginPath, false); err != nil { return nil, NewUnzipExitingError(err, fmt.Sprintf("Unzip err, file is [%s], target dir is [%s], err is [%s]", packagePath, pluginPath, err.Error())) } if config.PluginType() == PLUGIN_COMMANDER { commanderInfoPath := filepath.Join(pluginPath, "axt-commander.json") if !fileutil.CheckFileIsExist(commanderInfoPath) { err := errors.New(fmt.Sprintf("File axt-commander.json not exist, %s.", commanderInfoPath)) return nil, NewPluginFormatExitingError(err, fmt.Sprintf("File axt-commander.json not exist, %s.", commanderInfoPath)) } commanderInfo := CommanderInfo{} if content, err := fuzzyjson.UnmarshalFile(commanderInfoPath, &commanderInfo); err != nil { return nil, NewUnmarshalExitingError(err, fmt.Sprintf("Unmarshal axt-commander.json err, axt-commander.json is [%s], err is [%s]", string(content), err.Error())) } plugin.CommanderInfo = commanderInfo } cmdPath := filepath.Join(pluginPath, config.RunPath) if !fileutil.CheckFileIsExist(cmdPath) { log.GetLogger().Infoln("Cmd file not exist: ", cmdPath) return nil, NewPluginFormatExitingError(errors.New("Cmd file not exist: "+cmdPath), fmt.Sprintf("Executable file not exist, %s.", cmdPath)) } if osutil.GetOsType() != osutil.OSWin { if err := os.Chmod(cmdPath, os.FileMode(0o744)); err != nil { return nil, NewExecutablePermissionExitingError(err, "Make plugin file executable err: "+err.Error()) } } if pluginIndex == -1 { plugin.PluginID = "local_" + plugin.Name + "_" + plugin.Version _, err = insertNewInstalledPlugin(plugin) } else { err = updateInstalledPlugin(pluginIndex, plugin) } if err != nil { return nil, NewDumpInstalledPluginsExitingError(err) } return &Fetched{ PluginName: config.Name, PluginVersion: config.Version, PluginType: config.PluginType(), AddSysTag: config.AddSysTag, Entrypoint: cmdPath, ExecutionTimeoutInSeconds: executionTimeoutInSeconds, EnvPluginDir: pluginPath, EnvPrePluginDir: envPrePluginDir, }, nil } func (pm *PluginManager) executePluginOnlineOrLocal(fetchOptions *ExecFetchOptions, executeParams *ExecuteParams) (exitCode int, err error) { const funcName = "ExecutePluginOnlineOrLocal" log.GetLogger().Info("Enter executePluginOnlineOrLocal") var ( // variables for metrics resource string errorCode string localArch string pluginName string = fetchOptions.PluginName pluginVersion string = fetchOptions.Version pluginType string = PLUGIN_ONCE ) defer func() { metrics.GetPluginExecuteEvent( "pluginName", pluginName, "pluginVersion", pluginVersion, "pluginType", pluginType, "resource", resource, // 插件来源,文件、本地已安装的、线上拉取的 "exitCode", fmt.Sprint(exitCode), "errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码 "localArch", localArch, "localOsType", osutil.GetOsType(), ).ReportEventSync() }() localArch, _ = GetArch() var fetched *Fetched var queried *PluginInfo var exitingErr ExitingError if fetchOptions.Local { // execute local plugin fetched, exitingErr = pm.queryFromLocalOnly(fetchOptions.PluginName, fetchOptions.Version) } else { fetched, queried, exitingErr = pm.queryFromOnlineOrLocal(fetchOptions, localArch) } if exitingErr != nil { err = exitingErr.Unwrap() exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error()) return } // Acquire plugin-wise shared lock var pluginLockFile *os.File pluginLockFile, err = openPluginLockFile(fetchOptions.PluginName) if err != nil { err = NewOpenPluginLockFileError(err) exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error())) return } defer pluginLockFile.Close() if err = filelock.TryRLock(pluginLockFile); err != nil { err = NewAcquirePluginExclusiveLockError(err) exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error())) return } defer filelock.Unlock(pluginLockFile) if fetched != nil { resource = FetchFromLocalInstalled } else if queried != nil { resource = FetchFromOnline fetchTimeout := time.Duration(fetchOptions.FetchTimeoutInSeconds) * time.Second guardErr := InstallLockGuard{ PluginName: queried.Name, PluginVersion: queried.Version, AcquireSharedLockTimeout: fetchTimeout, }.Do(func() { // Double-check if specified plugin has been installed by another // plugin-manager process, that releases plugin-version-wise // exclusive lock and then this acquired. fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version) if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) { return } fetched, exitingErr = pm.installFromOnline(queried, fetchTimeout, localArch) }, func() { fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version) }) if guardErr != nil { err = guardErr exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", queried.Name, queried.Version, err.Error())) return } if exitingErr != nil { err = exitingErr.Unwrap() exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error()) return } } else { // SHOULD NEVER RUN INTO THIS BRANCH err = ErrPackageNotFound exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, err.Error()) return } pluginName = fetched.PluginName pluginVersion = fetched.PluginVersion pluginType = fetched.PluginType executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds if executeParams.OptionalExecutionTimeoutInSeconds != nil { executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds } args := executeParams.SplitArgs() env := []string{ "PLUGIN_DIR=" + fetched.EnvPluginDir, "PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir, } var options []process.CmdOption options, err = prepareCmdOptions(executeParams) if err != nil { exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error())) return } exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...) // 常驻插件:如果是从线上拉取的插件,或者调用的接口有可能改变插件状态,需要主动上报一次插件状态 // 一次性插件:如果插件是从线上拉取的,需要上报一次插件状态 sysTagType := "" if pluginType == PLUGIN_PERSIST && (needReportStatus(args) || resource == FetchFromOnline) { if resource == FetchFromOnline { if fetched.AddSysTag { sysTagType = AddSysTag } else if len(fetched.EnvPrePluginDir) != 0 { sysTagType = RemoveSysTag } } status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, sysTagType) log.GetLogger().WithFields(log.Fields{ "pluginName": pluginName, "pluginVersion": pluginVersion, "cmdPath": fetched.Entrypoint, "timeout": executionTimeoutInSeconds, "env": env, "status": status, }).WithError(err).Infof("CheckAndReportPlugin") } else if pluginType == PLUGIN_ONCE && resource == FetchFromOnline { if fetched.AddSysTag { sysTagType = AddSysTag } else if len(fetched.EnvPrePluginDir) != 0 { sysTagType = RemoveSysTag } pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, sysTagType) } return } func (pm *PluginManager) queryFromLocalOnly(pluginName string, pluginVersion string) (*Fetched, ExitingError) { localInfo, err := getLocalPluginInfo(pluginName, pluginVersion) if err != nil { return nil, NewLoadInstalledPluginsExitingError(err) } if localInfo == nil { return nil, NewPackageNotFoundExitingError(ErrPackageNotFound, fmt.Sprintf("Could not found local package [%s]", pluginName)) } envPluginDir := filepath.Join(pm.pluginRoot, localInfo.Name, localInfo.Version) executionTimeoutInSeconds := 60 if t, err := strconv.Atoi(localInfo.Timeout); err == nil { executionTimeoutInSeconds = t } return &Fetched{ PluginName: localInfo.Name, PluginVersion: localInfo.Version, PluginType: localInfo.PluginType(), AddSysTag: localInfo.AddSysTag, Entrypoint: filepath.Join(envPluginDir, localInfo.RunPath), ExecutionTimeoutInSeconds: executionTimeoutInSeconds, EnvPluginDir: envPluginDir, EnvPrePluginDir: "", }, nil } // didn't set --local, so local & online both try func (pm *PluginManager) queryFromOnlineOrLocal(fetchOptions *ExecFetchOptions, localArch string) (*Fetched, *PluginInfo, ExitingError) { localInfo, err := getLocalPluginInfo(fetchOptions.PluginName, fetchOptions.Version) if err != nil { return nil, nil, NewLoadInstalledPluginsExitingError(err) } onlineInfo, onlineOtherArch, err := getOnlinePluginInfo(fetchOptions.PluginName, fetchOptions.Version) if err != nil { return nil, nil, NewGetOnlinePackageInfoExitingError(err, "Get plugin info from online err: "+err.Error()) } useLocal := false if localInfo != nil { if onlineInfo != nil { // 本地和线上版本一致,使用本地插件文件 if versionutil.CompareVersion(localInfo.Version, onlineInfo.Version) == 0 { log.GetLogger().Infof("ExecutePluginOnlineOrLocal: Plugin[%s], local version[%s] same to online version[%s], so use local package", fetchOptions.PluginName, localInfo.Version, onlineInfo.Version) useLocal = true } else { // 本地和线上版本不一致,使用线上版本 log.GetLogger().Infof("ExecutePluginOnlineOrLocal: Plugin[%s], local version[%s] different from online version[%s], so use online package", fetchOptions.PluginName, localInfo.Version, onlineInfo.Version) } } else { useLocal = true } } else { if onlineInfo == nil { var tip string if len(onlineOtherArch) == 0 { tip = fmt.Sprintf("Could not found both local and online, package[%s] version[%s]\n", fetchOptions.PluginName, fetchOptions.Version) } else { tip = fmt.Sprintf("Could not found local package[%s] version[%s], found online package but it`s arch[%s] not match local_arch[%s] \n", fetchOptions.PluginName, fetchOptions.Version, strings.Join(onlineOtherArch, ", "), localArch) } return nil, nil, NewPackageNotFoundExitingError(ErrPackageNotFound, tip) } } // use local package if useLocal { executionTimeoutInSeconds := 60 if t, err := strconv.Atoi(localInfo.Timeout); err == nil { executionTimeoutInSeconds = t } pluginPath := filepath.Join(pm.pluginRoot, localInfo.Name, localInfo.Version) return &Fetched{ PluginName: localInfo.Name, PluginVersion: localInfo.Version, PluginType: localInfo.PluginType(), Entrypoint: filepath.Join(pluginPath, localInfo.RunPath), ExecutionTimeoutInSeconds: executionTimeoutInSeconds, EnvPluginDir: pluginPath, EnvPrePluginDir: "", }, nil, nil } return nil, onlineInfo, nil } // 下载并安装插件 // pull package func (pm *PluginManager) installFromOnline(onlineInfo *PluginInfo, timeout time.Duration, localArch string) (*Fetched, ExitingError) { ctx := context.Background() var cancel context.CancelFunc if timeout > 0 { ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } filePath := filepath.Join(pm.pluginRoot, onlineInfo.Name+".zip") log.GetLogger().Infof("Downloading package from [%s], save to [%s] ", onlineInfo.Url, filePath) const maxRetries = 3 const retryDelay = 3 * time.Second var err error if timeout == 0 { err = func(remoteUrl string, localPath string) error { var err2 error for retry := maxRetries; retry > 0; retry-- { err2 = util.HttpDownlod(remoteUrl, localPath) if err2 == nil { break } time.Sleep(retryDelay) } return err2 }(onlineInfo.Url, filePath) } else { err = func(ctx context.Context, remoteUrl string, localPath string) error { var err2 error for retry := maxRetries; retry > 0; retry-- { err2 = util.HttpDownloadContext(ctx, remoteUrl, localPath) if err2 == nil { break } delayTimer := time.NewTimer(retryDelay) select { case <-ctx.Done(): return ctx.Err() case <-delayTimer.C: continue } } return err2 }(ctx, onlineInfo.Url, filePath) } if err != nil { return nil, NewDownloadExitingError(err, fmt.Sprintf("Downloading package failed, plugin.Url is [%s], err is [%s]", onlineInfo.Url, err.Error())) } log.GetLogger().Infoln("Check MD5...") // TODO-FIXME: Calculating the MD5 checksum for plugin package is also a // time-consuming procedure, which should be cancelable when timed out md5Checksum, err := util.ComputeMd5(filePath) if err != nil { return nil, NewMD5CheckExitingError(err, fmt.Sprintf("Compute md5 of plugin file[%s] err, plugin.Url is [%s], err is [%s]", filePath, onlineInfo.Url, err.Error())) } if strings.ToLower(md5Checksum) != strings.ToLower(onlineInfo.Md5) { log.GetLogger().Errorf("Md5 not match, onlineInfo.Md5[%s], package file md5[%s]\n", onlineInfo.Md5, md5Checksum) return nil, NewMD5CheckExitingError(errors.New("Md5 not macth"), fmt.Sprintf("Md5 not match, onlineInfo.Md5 is [%s], real md5 is [%s], plugin.Url is [%s]", onlineInfo.Md5, md5Checksum, onlineInfo.Url)) } unzipdir := filepath.Join(pm.pluginRoot, onlineInfo.Name, onlineInfo.Version) pathutil.MakeSurePath(unzipdir) log.GetLogger().Infoln("Unzip package...") if err := zipfile.UnzipContext(ctx, filePath, unzipdir, false); err != nil { return nil, NewUnzipExitingError(err, fmt.Sprintf("Unzip package err, plugin.Url is [%s], err is [%s]", onlineInfo.Url, err.Error())) } os.RemoveAll(filePath) config_path := filepath.Join(unzipdir, "config.json") if !fileutil.CheckFileIsExist(config_path) { err := errors.New(fmt.Sprintf("File config.json not exist, %s.", config_path)) return nil, NewPluginFormatExitingError(err, fmt.Sprintf("File config.json not exist, %s.", config_path)) } config := pluginConfig{} if content, err := fuzzyjson.UnmarshalFile(config_path, &config); err != nil { return nil, NewUnmarshalExitingError(err, fmt.Sprintf("Unmarshal config.json err, config.json is [%s], err is [%s]", string(content), err.Error())) } if config.HeartbeatInterval <= 0 { config.HeartbeatInterval = 60 } if config.PluginType() != onlineInfo.PluginType() { tip := fmt.Sprintf("config.PluginType[%s] not match to pluginType[%s]", config.PluginType(), onlineInfo.PluginType()) return nil, NewPluginFormatExitingError(errors.New(tip), tip) } // 接口返回的插件信息中没有HeartbeatInterval字段,需要以插件包中的config.json为准 onlineInfo.HeartbeatInterval = config.HeartbeatInterval onlineInfo.SetPluginType(config.PluginType()) onlineInfo.AddSysTag = config.AddSysTag if config.PluginType() == PLUGIN_COMMANDER { commanderInfoPath := filepath.Join(unzipdir, "axt-commander.json") if !fileutil.CheckFileIsExist(commanderInfoPath) { err := errors.New(fmt.Sprintf("File axt-commander.json not exist, %s.", commanderInfoPath)) return nil, NewPluginFormatExitingError(err, fmt.Sprintf("File axt-commander.json not exist, %s.", commanderInfoPath)) } commanderInfo := CommanderInfo{} if content, err := fuzzyjson.UnmarshalFile(commanderInfoPath, &commanderInfo); err != nil { return nil, NewUnmarshalExitingError(err, fmt.Sprintf("Unmarshal axt-commander.json err, axt-commander.json is [%s], err is [%s]", string(content), err.Error())) } onlineInfo.CommanderInfo = commanderInfo } cmdPath := filepath.Join(unzipdir, config.RunPath) // 检查系统类型和架构是否符合 if strings.ToLower(onlineInfo.OSType) != "both" && strings.ToLower(onlineInfo.OSType) != osutil.GetOsType() { err := errors.New("Plugin ostype not suit for this system") return nil, NewPluginFormatExitingError(err, fmt.Sprintf("Plugin ostype[%s] not suit for this system[%s], plugin.Url is [%s]", onlineInfo.OSType, osutil.GetOsType(), onlineInfo.Url)) } if strings.ToLower(onlineInfo.Arch) != "all" && strings.ToLower(onlineInfo.Arch) != localArch { err := errors.New("Plugin arch not suit for this system") return nil, NewPluginFormatExitingError(err, fmt.Sprintf("Plugin arch[%s] not suit for this system[%s], plugin.Url is [%s]", onlineInfo.Arch, localArch, onlineInfo.Url)) } if !fileutil.CheckFileIsExist(cmdPath) { log.GetLogger().Infoln("Cmd file not exist: ", cmdPath) err := errors.New("Cmd file not exist: " + cmdPath) return nil, NewPluginFormatExitingError(err, fmt.Sprintf("Executable file not exist, %s.", cmdPath)) } if osutil.GetOsType() != osutil.OSWin { if err = os.Chmod(cmdPath, os.FileMode(0o744)); err != nil { return nil, NewExecutablePermissionExitingError(err, fmt.Sprintf("Make plugin file executable err, plugin.Url is [%s], err is [%s]", onlineInfo.Url, err.Error())) } } // update INSTALLEDPLUGINS file var envPrePluginDir string pluginIndex, pluginInfo, err := getInstalledPluginByName(onlineInfo.Name) if err != nil { return nil, NewLoadInstalledPluginsExitingError(err) } if pluginIndex != -1 && pluginInfo.IsRemoved { // Actually from the database remove the record of removed plugin if err = deleteInstalledPluginByIndex(pluginIndex); err != nil { return nil, NewDumpInstalledPluginsExitingError(err) } pluginIndex = -1 } if pluginIndex == -1 { _, err = insertNewInstalledPlugin(onlineInfo) } else { envPrePluginDir = filepath.Join(pm.pluginRoot, pluginInfo.Name, pluginInfo.Version) err = updateInstalledPlugin(pluginIndex, onlineInfo) } if err != nil { return nil, NewDumpInstalledPluginsExitingError(err) } executionTimeoutInSeconds := 60 if t, err := strconv.Atoi(config.Timeout); err == nil { executionTimeoutInSeconds = t } return &Fetched{ PluginName: config.Name, PluginVersion: config.Version, PluginType: config.PluginType(), AddSysTag: config.AddSysTag, Entrypoint: cmdPath, ExecutionTimeoutInSeconds: executionTimeoutInSeconds, EnvPluginDir: filepath.Join(pm.pluginRoot, config.Name, config.Version), EnvPrePluginDir: envPrePluginDir, }, nil } func (pm *PluginManager) executePlugin(cmdPath string, paramList []string, timeout int, env []string, quiet bool, options ...process.CmdOption) (exitCode int, errorCode string, err error) { log.GetLogger().Infof("Enter executePlugin, cmdPath[%s] paramList[%v] paramCount[%d] timeout[%d]\n", cmdPath, paramList, len(paramList), timeout) funcName := "ExecutePlugin" if !fileutil.CheckFileIsExist(cmdPath) { log.GetLogger().Infoln("Cmd file not exist: ", cmdPath) err = errors.New("Cmd file not exist: " + cmdPath) exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Executable file not exist, %s.", cmdPath)) return } if pm.Verbose { fmt.Printf("Run cmd: %s, params: %v\n", cmdPath, paramList) } processCmd := process.NewProcessCmd(options...) // set environment variable if env != nil && len(env) > 0 { processCmd.SetEnv(env) } status := process.Success commandName := cmdPath if filepath.Ext(cmdPath) == ".ps1" { commandName = "powershell" paramList = append([]string{cmdPath}, paramList...) } if quiet { exitCode, status, err = processCmd.SyncRun("", commandName, paramList, nil, nil, os.Stdin, nil, timeout) } else { exitCode, status, err = processCmd.SyncRun("", commandName, paramList, os.Stdout, os.Stderr, os.Stdin, nil, timeout) } if status == process.Fail { exitCode = EXECUTE_FAILED } else if status == process.Timeout { exitCode = EXECUTE_TIMEOUT } if !quiet { switch exitCode { case EXECUTE_FAILED: _, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Execute plugin failed, err: %v", err)) case EXECUTE_TIMEOUT: _, errorCode = errProcess(funcName, EXECUTE_TIMEOUT, err, fmt.Sprintf("Execute plugin timeout, timeout[%d] err: %v", timeout, err)) } } log.GetLogger().Info(fmt.Sprintf("executePlugin: commandName: %s, params: %+q, exitCode: %d, timeout: %d, env: %v, err: %v\n", commandName, paramList, exitCode, timeout, env, err)) return } func (pm *PluginManager) VerifyPlugin(fetchOptions *VerifyFetchOptions, executeParams *ExecuteParams) (exitCode int, err error) { const funcName = "VerifyPlugin" log.GetLogger().WithFields(log.Fields{ "fetchOptions": fetchOptions, "executeParams": executeParams, }).Infoln("Enter VerifyPlugin") // pull package unzipdir := filepath.Join(pm.pluginRoot, "verify_plugin_test") exitCode, err = func(packageUrl string, timeoutInSeconds int) (int, error) { ctx := context.Background() var cancel context.CancelFunc if timeoutInSeconds > 0 { ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutInSeconds)*time.Second) defer cancel() } log.GetLogger().Infoln("Downloading package from ", packageUrl) filePath, err := func(packageUrl string, timeoutInSeconds int) (string, error) { fileName := packageUrl[strings.LastIndex(packageUrl, "/")+1:] filePath := filepath.Join(pm.pluginRoot, fileName) if len(packageUrl) > 4 && packageUrl[:4] == "http" { return filePath, util.HttpDownloadContext(ctx, packageUrl, filePath) } else { return filePath, FileProtocolDownload(packageUrl, filePath) } }(packageUrl, timeoutInSeconds) if err != nil { exitcode, _ := errProcess(funcName, DOWNLOAD_FAIL, err, fmt.Sprintf("Downloading package failed, url is [%s], err is [%s]", packageUrl, err.Error())) return exitcode, err } pathutil.MakeSurePath(unzipdir) log.GetLogger().Infoln("Unzip package...") if err := zipfile.UnzipContext(ctx, filePath, unzipdir, false); err != nil { exitcode, _ := errProcess(funcName, UNZIP_ERR, err, fmt.Sprintf("Unzip package err, url is [%s], err is [%s]", packageUrl, err.Error())) return exitcode, err } os.RemoveAll(filePath) return 0, nil }(fetchOptions.Url, fetchOptions.FetchTimeoutInSeconds) if err != nil { return } configPath := filepath.Join(unzipdir, "config.json") if !fileutil.CheckFileIsExist(configPath) { err = errors.New("Can not find the config.json") exitCode, _ = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("File config.json not exist, %s.", configPath)) return } config := pluginConfig{} var content []byte if content, err = fuzzyjson.UnmarshalFile(configPath, &config); err != nil { exitCode, _ = errProcess(funcName, UNMARSHAL_ERR, err, fmt.Sprintf("Unmarshal config.json err, config.json is [%s], err is [%s]", string(content), err.Error())) return } // 检查系统类型和架构是否符合 if config.OsType != "" && strings.ToLower(config.OsType) != "both" && strings.ToLower(config.OsType) != osutil.GetOsType() { err = errors.New("Plugin ostype not suit for this system") exitCode, _ = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin ostype[%s] not suit for this system[%s], url is [%s]", config.OsType, osutil.GetOsType(), fetchOptions.Url)) return } localArch, _ := GetArch() if config.Arch != "" && strings.ToLower(config.Arch) != "all" && strings.ToLower(config.Arch) != localArch { err = errors.New("Plugin arch not suit for this system") exitCode, _ = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin arch[%s] not suit for this system[%s], url is [%s]", config.Arch, localArch, fetchOptions.Url)) return } cmdPath := filepath.Join(unzipdir, config.RunPath) if !fileutil.CheckFileIsExist(cmdPath) { err = errors.New("Can not find the cmd file") exitCode, _ = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Executable file not exist, %s.", cmdPath)) return } if osutil.GetOsType() != osutil.OSWin { if err = os.Chmod(cmdPath, os.FileMode(0o744)); err != nil { exitCode, _ = errProcess(funcName, EXECUTABLE_PERMISSION_ERR, err, "Make plugin file executable err: "+err.Error()) return } } executionTimeoutInSeconds := 60 if t, err := strconv.Atoi(config.Timeout); err != nil { fmt.Println("config.Timeout is invalid: ", config.Timeout) } else { executionTimeoutInSeconds = t } if executeParams.OptionalExecutionTimeoutInSeconds != nil { executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds } // 执行插件时要注入的环境变量 env := []string{ // 当前执行的插件的执行目录 "PLUGIN_DIR=" + unzipdir, // 如果已有同名插件,表示已有同名插件的执行目录;否则为空 "PRE_PLUGIN_DIR=", } var options []process.CmdOption options, err = prepareCmdOptions(executeParams) if err != nil { exitCode, _ = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options: %s", err.Error())) return } exitCode, _, err = pm.executePlugin(cmdPath, executeParams.SplitArgs(), executionTimeoutInSeconds, env, false, options...) return } // 向服务端上报某个插件状态 func (pm *PluginManager) ReportPluginStatus(pluginName, pluginVersion, status string, sysTagType string) error { if len(pluginName) > PLUGIN_NAME_MAXLEN { pluginName = pluginName[:PLUGIN_NAME_MAXLEN] } if len(pluginVersion) > PLUGIN_VERSION_MAXLEN { pluginVersion = pluginVersion[:PLUGIN_VERSION_MAXLEN] } pluginStatusRequest := PluginStatusResquest{ Plugin: []PluginStatus{ { Name: pluginName, Version: pluginVersion, Status: status, SysTagType: sysTagType, }, }, } requestPayloadBytes, err := fuzzyjson.Marshal(pluginStatusRequest) if err != nil { log.GetLogger().WithError(err).Error("ReportPluginStatus: pluginStatusList marshal err: " + err.Error()) return err } requestPayload := string(requestPayloadBytes) url := util.GetPluginHealthService() _, err = util.HttpPost(url, requestPayload, "") for i := 0; i < 3 && err != nil; i++ { log.GetLogger().Infof("ReportPluginStatus: upload pluginStatusList fail, need retry: %s", requestPayload) time.Sleep(time.Duration(2) * time.Second) _, err = util.HttpPost(url, requestPayload, "") } return err } // 检查并上报常驻插件状态 func (pm *PluginManager) CheckAndReportPlugin(pluginName, pluginVersion, cmdPath string, timeout int, env []string, sysTagType string) (status string, err error) { exitCode := 0 status = PERSIST_UNKNOWN exitCode, _, err = pm.executePlugin(cmdPath, []string{"--status"}, timeout, env, true) if err != nil { return } if exitCode != 0 { status = PERSIST_FAIL } else { status = PERSIST_RUNNING } return status, pm.ReportPluginStatus(pluginName, pluginVersion, status, sysTagType) } func needReportStatus(paramsList []string) bool { for _, p := range paramsList { for _, pp := range NEED_REFRESH_STATUS_API { if p == pp { return true } } } return false } func (pm *PluginManager) InstallPluginFromOnline(onlineInfo *PluginInfo, timeout int) error { localArch, _ := GetArch() _, err := pm.installFromOnline(onlineInfo, time.Second*time.Duration(timeout), localArch) return err } func QueryPluginFromOnline(pluginName, pluginType, version string) (*PluginInfo, error) { pluginInfos, err := FetchPackageInfo(log.GetLogger(), pluginName, version, true) if err != nil { return nil, err } if len(pluginInfos) == 0 { return nil, fmt.Errorf("not found") } var res *PluginInfo for i, _ := range pluginInfos { if pluginType != "" && pluginInfos[i].PluginType() != pluginType { continue } if version != "" { if pluginInfos[i].Version == version { res = &pluginInfos[i] break } } else { if res == nil || versionutil.CompareVersion(pluginInfos[i].Version, res.Version) > 0 { res = &pluginInfos[i] } } } if res == nil { return nil, fmt.Errorf("not found") } return res, nil } func QueryPluginFromLocal(pluginName, pluginType string) (*PluginInfo, error) { plugins, err := getInstalledPluginsByName(pluginName) if err != nil { return nil, err } var res *PluginInfo for i, _ := range plugins { if plugins[i].IsRemoved || (pluginType != "" && plugins[i].PluginType() != pluginType) { continue } res = &plugins[i] break } if res == nil { return nil, fmt.Errorf("not found") } return res, nil } func QueryPluginFromLocalPreInstalled(pluginName, pluginType string) (*PluginInfo, error) { preInstalledPlugins, err := getAllPreInstalledPlugins() if err != nil { return nil, err } var res *PluginInfo for i, _ := range preInstalledPlugins { if preInstalledPlugins[i].IsRemoved || preInstalledPlugins[i].Name != pluginName || (pluginType != "" && preInstalledPlugins[i].PluginType() != pluginType) { continue } res = &preInstalledPlugins[i] break } if res == nil { return nil, fmt.Errorf("not found") } return res, nil } func LoadAllPluginFromLocal(pluginType string) ([]PluginInfo, error) { plugins, err := getAllInstalledPlugins() if err != nil { return nil, err } if pluginType != "" { res := []PluginInfo{} for _, p := range plugins { if p.PluginType() == pluginType && !p.IsRemoved { res = append(res, p) } } return res, nil } return plugins, nil } func (pm *PluginManager) GetPluginCommandPath(pluginInfo *PluginInfo) string { return filepath.Join(pm.pluginRoot, pluginInfo.Name, pluginInfo.Version, pluginInfo.RunPath) }