in agent/pluginmanager/acspluginmanager/acspluginmanager.go [325:531]
func (pm *PluginManager) executePluginFromFile(packagePath string, fetchTimeoutInSeconds int, executeParams *ExecuteParams) (exitCode int, err error) {
const funcName = "ExecutePluginFromFile"
log.GetLogger().Infof("Enter executePluginFromFile")
var (
// variables for metrics
pluginName string
pluginVersion string
resource string = FetchFromLocalFile
errorCode string
localArch string
pluginType string
// 执行插件时要注入的环境变量
envPrePluginDir string // 如果已有同名的其他版本插件,表示原有同名插件的执行目录;否则为空
)
defer func() {
metrics.GetPluginExecuteEvent(
"pluginName", pluginName,
"pluginVersion", pluginVersion,
"pluginType", pluginType,
"resource", resource, // 插件来源,文件、本地已安装的、线上拉取的
"exitCode", fmt.Sprint(exitCode),
"errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码
"localArch", localArch,
"localOsType", osutil.GetOsType(),
).ReportEventSync()
}()
if pm.Verbose {
fmt.Println("Execute plugin from file: ", packagePath)
}
localArch, _ = GetArch()
exitCode = SUCCESS
if !fileutil.CheckFileIsExist(packagePath) {
err = errors.New("File not exist: " + packagePath)
exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, "Package file not exist: "+packagePath)
return
}
const compressedConfigPath = "config.json"
var configBody []byte
configBody, err = zipfile.PeekFile(packagePath, compressedConfigPath)
if err != nil {
if errors.Is(err, zip.ErrFormat) {
errorMessage := "Package file isn`t a zip file: " + packagePath
err = errors.New(errorMessage)
exitCode, errorCode = errProcess(funcName, PACKAGE_FORMAT_ERR, err, errorMessage)
return
} else if errors.Is(err, os.ErrNotExist) {
errorMessage := fmt.Sprintf("Manifest file config.json does not exist in %s.", packagePath)
err = errors.New(errorMessage)
exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, errorMessage)
return
} else {
exitCode, errorCode = errProcess(funcName, UNZIP_ERR, err, fmt.Sprintf("Unzip err, file is [%s], target file is [%s], err is [%s]", packagePath, compressedConfigPath, err.Error()))
return
}
}
configContent := string(configBody)
config := pluginConfig{}
if err = fuzzyjson.Unmarshal(configContent, &config); err != nil {
exitCode, errorCode = errProcess(funcName, UNMARSHAL_ERR, err, fmt.Sprintf("Unmarshal config.json err, config.json is [%s], err is [%s]", configContent, err.Error()))
return
}
pluginName = config.Name
pluginVersion = config.Version
// 检查系统类型和架构是否符合
if config.OsType != "" && strings.ToLower(config.OsType) != osutil.GetOsType() {
err = errors.New("Plugin ostype not suit for this system")
exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin ostype[%s] not suit for this system[%s]", config.OsType, osutil.GetOsType()))
return
}
if config.Arch != "" && strings.ToLower(config.Arch) != "all" && strings.ToLower(config.Arch) != localArch {
err = errors.New("Plugin arch not suit for this system")
exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin arch[%s] not suit for this system[%s]", config.Arch, localArch))
return
}
pluginIndex, plugin, err := getInstalledPluginByName(config.Name)
if err != nil {
exitCode, errorCode = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error())
return
}
if plugin != nil && plugin.IsRemoved {
// 之前的同名插件已经被删除,相当于重新安装
if err = deleteInstalledPluginByIndex(pluginIndex); err != nil {
exitCode, errorCode = errProcess(funcName, DUMP_INSTALLEDPLUGINS_ERR, err, "Update installed_plugins file err: "+err.Error())
return
}
pluginIndex = -1
plugin = nil
}
if plugin != nil {
envPrePluginDir = filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version)
// has installed, check version
if versionutil.CompareVersion(config.Version, plugin.Version) <= 0 {
if !pm.Yes {
yn := ""
for {
fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install ? [y/n]: \n", plugin.Name, plugin.Version, config.Version)
fmt.Scanln(&yn)
if yn == "y" || yn == "n" {
break
}
}
if yn == "n" {
log.GetLogger().Infoln("Execute plugin cancel...")
fmt.Println("Execute plugin cancel...")
return
}
}
fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install...\n", plugin.Name, plugin.Version, config.Version)
} else {
fmt.Printf("[%s %s] has installed, this package version[%s] is newer, keep install...\n", plugin.Name, plugin.Version, config.Version)
}
}
// Acquire plugin-wise shared lock
var pluginLockFile *os.File
pluginLockFile, err = openPluginLockFile(config.Name)
if err != nil {
err = NewOpenPluginLockFileError(err)
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
return
}
defer pluginLockFile.Close()
if err = filelock.TryRLock(pluginLockFile); err != nil {
err = NewAcquirePluginExclusiveLockError(err)
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
return
}
defer filelock.Unlock(pluginLockFile)
var fetched *Fetched
var exitingErr ExitingError
fetchTimeout := time.Duration(fetchTimeoutInSeconds) * time.Second
guardErr := InstallLockGuard{
PluginName: config.Name,
PluginVersion: config.Version,
AcquireSharedLockTimeout: fetchTimeout,
}.Do(func() {
// Double-check if specified plugin has been installed by another
// plugin-manager process, that releases plugin-version-wise
// exclusive lock and then this acquired.
fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version)
if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) {
return
}
fetched, exitingErr = pm.installFromFile(packagePath, &config, plugin, pluginIndex, fetchTimeout)
}, func() {
fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version)
// TODO-FIXME: envPrePluginDir SHOULD be constructed on demand only
fetched.EnvPrePluginDir = envPrePluginDir
})
if guardErr != nil {
err = guardErr
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
return
}
if exitingErr != nil {
err = exitingErr.Unwrap()
exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
return
}
fmt.Printf("Plugin[%s] installed!\n", fetched.PluginName)
pluginName = fetched.PluginName
pluginVersion = fetched.PluginVersion
pluginType = fetched.PluginType
executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds
if executeParams.OptionalExecutionTimeoutInSeconds != nil {
executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds
}
args := executeParams.SplitArgs()
env := []string{
"PLUGIN_DIR=" + fetched.EnvPluginDir,
"PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir,
}
var options []process.CmdOption
options, err = prepareCmdOptions(executeParams)
if err != nil {
exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error()))
return
}
exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...)
// 常驻插件和一次性插件:需要主动上报一次插件状态
if pluginType == PLUGIN_PERSIST {
status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, "")
log.GetLogger().WithFields(log.Fields{
"pluginName": pluginName,
"pluginVersion": pluginVersion,
"cmdPath": fetched.Entrypoint,
"timeout": executionTimeoutInSeconds,
"env": env,
"status": status,
}).WithError(err).Infof("CheckAndReportPlugin")
} else if pluginType == PLUGIN_ONCE {
pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, "")
}
return
}