in agent/pluginmanager/acspluginmanager/acspluginmanager.go [634:781]
func (pm *PluginManager) executePluginOnlineOrLocal(fetchOptions *ExecFetchOptions, executeParams *ExecuteParams) (exitCode int, err error) {
const funcName = "ExecutePluginOnlineOrLocal"
log.GetLogger().Info("Enter executePluginOnlineOrLocal")
var (
// variables for metrics
resource string
errorCode string
localArch string
pluginName string = fetchOptions.PluginName
pluginVersion string = fetchOptions.Version
pluginType string = PLUGIN_ONCE
)
defer func() {
metrics.GetPluginExecuteEvent(
"pluginName", pluginName,
"pluginVersion", pluginVersion,
"pluginType", pluginType,
"resource", resource, // 插件来源,文件、本地已安装的、线上拉取的
"exitCode", fmt.Sprint(exitCode),
"errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码
"localArch", localArch,
"localOsType", osutil.GetOsType(),
).ReportEventSync()
}()
localArch, _ = GetArch()
var fetched *Fetched
var queried *PluginInfo
var exitingErr ExitingError
if fetchOptions.Local {
// execute local plugin
fetched, exitingErr = pm.queryFromLocalOnly(fetchOptions.PluginName, fetchOptions.Version)
} else {
fetched, queried, exitingErr = pm.queryFromOnlineOrLocal(fetchOptions, localArch)
}
if exitingErr != nil {
err = exitingErr.Unwrap()
exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
return
}
// Acquire plugin-wise shared lock
var pluginLockFile *os.File
pluginLockFile, err = openPluginLockFile(fetchOptions.PluginName)
if err != nil {
err = NewOpenPluginLockFileError(err)
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error()))
return
}
defer pluginLockFile.Close()
if err = filelock.TryRLock(pluginLockFile); err != nil {
err = NewAcquirePluginExclusiveLockError(err)
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error()))
return
}
defer filelock.Unlock(pluginLockFile)
if fetched != nil {
resource = FetchFromLocalInstalled
} else if queried != nil {
resource = FetchFromOnline
fetchTimeout := time.Duration(fetchOptions.FetchTimeoutInSeconds) * time.Second
guardErr := InstallLockGuard{
PluginName: queried.Name,
PluginVersion: queried.Version,
AcquireSharedLockTimeout: fetchTimeout,
}.Do(func() {
// Double-check if specified plugin has been installed by another
// plugin-manager process, that releases plugin-version-wise
// exclusive lock and then this acquired.
fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version)
if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) {
return
}
fetched, exitingErr = pm.installFromOnline(queried, fetchTimeout, localArch)
}, func() {
fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version)
})
if guardErr != nil {
err = guardErr
exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", queried.Name, queried.Version, err.Error()))
return
}
if exitingErr != nil {
err = exitingErr.Unwrap()
exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
return
}
} else {
// SHOULD NEVER RUN INTO THIS BRANCH
err = ErrPackageNotFound
exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, err.Error())
return
}
pluginName = fetched.PluginName
pluginVersion = fetched.PluginVersion
pluginType = fetched.PluginType
executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds
if executeParams.OptionalExecutionTimeoutInSeconds != nil {
executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds
}
args := executeParams.SplitArgs()
env := []string{
"PLUGIN_DIR=" + fetched.EnvPluginDir,
"PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir,
}
var options []process.CmdOption
options, err = prepareCmdOptions(executeParams)
if err != nil {
exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error()))
return
}
exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...)
// 常驻插件:如果是从线上拉取的插件,或者调用的接口有可能改变插件状态,需要主动上报一次插件状态
// 一次性插件:如果插件是从线上拉取的,需要上报一次插件状态
sysTagType := ""
if pluginType == PLUGIN_PERSIST && (needReportStatus(args) || resource == FetchFromOnline) {
if resource == FetchFromOnline {
if fetched.AddSysTag {
sysTagType = AddSysTag
} else if len(fetched.EnvPrePluginDir) != 0 {
sysTagType = RemoveSysTag
}
}
status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, sysTagType)
log.GetLogger().WithFields(log.Fields{
"pluginName": pluginName,
"pluginVersion": pluginVersion,
"cmdPath": fetched.Entrypoint,
"timeout": executionTimeoutInSeconds,
"env": env,
"status": status,
}).WithError(err).Infof("CheckAndReportPlugin")
} else if pluginType == PLUGIN_ONCE && resource == FetchFromOnline {
if fetched.AddSysTag {
sysTagType = AddSysTag
} else if len(fetched.EnvPrePluginDir) != 0 {
sysTagType = RemoveSysTag
}
pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, sysTagType)
}
return
}