func()

in agent/pluginmanager/acspluginmanager/acspluginmanager.go [634:781]


func (pm *PluginManager) executePluginOnlineOrLocal(fetchOptions *ExecFetchOptions, executeParams *ExecuteParams) (exitCode int, err error) {
	const funcName = "ExecutePluginOnlineOrLocal"
	log.GetLogger().Info("Enter executePluginOnlineOrLocal")
	var (
		// variables for metrics
		resource  string
		errorCode string
		localArch string

		pluginName    string = fetchOptions.PluginName
		pluginVersion string = fetchOptions.Version
		pluginType    string = PLUGIN_ONCE
	)
	defer func() {
		metrics.GetPluginExecuteEvent(
			"pluginName", pluginName,
			"pluginVersion", pluginVersion,
			"pluginType", pluginType,
			"resource", resource, // 插件来源,文件、本地已安装的、线上拉取的
			"exitCode", fmt.Sprint(exitCode),
			"errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码
			"localArch", localArch,
			"localOsType", osutil.GetOsType(),
		).ReportEventSync()
	}()
	localArch, _ = GetArch()

	var fetched *Fetched
	var queried *PluginInfo
	var exitingErr ExitingError
	if fetchOptions.Local {
		// execute local plugin
		fetched, exitingErr = pm.queryFromLocalOnly(fetchOptions.PluginName, fetchOptions.Version)
	} else {
		fetched, queried, exitingErr = pm.queryFromOnlineOrLocal(fetchOptions, localArch)
	}
	if exitingErr != nil {
		err = exitingErr.Unwrap()
		exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
		return
	}

	// Acquire plugin-wise shared lock
	var pluginLockFile *os.File
	pluginLockFile, err = openPluginLockFile(fetchOptions.PluginName)
	if err != nil {
		err = NewOpenPluginLockFileError(err)
		exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error()))
		return
	}
	defer pluginLockFile.Close()
	if err = filelock.TryRLock(pluginLockFile); err != nil {
		err = NewAcquirePluginExclusiveLockError(err)
		exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", fetchOptions.PluginName, fetchOptions.Version, err.Error()))
		return
	}
	defer filelock.Unlock(pluginLockFile)

	if fetched != nil {
		resource = FetchFromLocalInstalled
	} else if queried != nil {
		resource = FetchFromOnline
		fetchTimeout := time.Duration(fetchOptions.FetchTimeoutInSeconds) * time.Second
		guardErr := InstallLockGuard{
			PluginName:    queried.Name,
			PluginVersion: queried.Version,

			AcquireSharedLockTimeout: fetchTimeout,
		}.Do(func() {
			// Double-check if specified plugin has been installed by another
			// plugin-manager process, that releases plugin-version-wise
			// exclusive lock and then this acquired.
			fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version)
			if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) {
				return
			}

			fetched, exitingErr = pm.installFromOnline(queried, fetchTimeout, localArch)
		}, func() {
			fetched, exitingErr = pm.queryFromLocalOnly(queried.Name, queried.Version)
		})
		if guardErr != nil {
			err = guardErr
			exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", queried.Name, queried.Version, err.Error()))
			return
		}
		if exitingErr != nil {
			err = exitingErr.Unwrap()
			exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
			return
		}
	} else {
		// SHOULD NEVER RUN INTO THIS BRANCH
		err = ErrPackageNotFound
		exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, err.Error())
		return
	}

	pluginName = fetched.PluginName
	pluginVersion = fetched.PluginVersion
	pluginType = fetched.PluginType

	executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds
	if executeParams.OptionalExecutionTimeoutInSeconds != nil {
		executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds
	}
	args := executeParams.SplitArgs()
	env := []string{
		"PLUGIN_DIR=" + fetched.EnvPluginDir,
		"PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir,
	}
	var options []process.CmdOption
	options, err = prepareCmdOptions(executeParams)
	if err != nil {
		exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error()))
		return
	}
	exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...)
	// 常驻插件:如果是从线上拉取的插件,或者调用的接口有可能改变插件状态,需要主动上报一次插件状态
	// 一次性插件:如果插件是从线上拉取的,需要上报一次插件状态
	sysTagType := ""
	if pluginType == PLUGIN_PERSIST && (needReportStatus(args) || resource == FetchFromOnline) {
		if resource == FetchFromOnline {
			if fetched.AddSysTag {
				sysTagType = AddSysTag
			} else if len(fetched.EnvPrePluginDir) != 0 {
				sysTagType = RemoveSysTag
			}
		}
		status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, sysTagType)
		log.GetLogger().WithFields(log.Fields{
			"pluginName":    pluginName,
			"pluginVersion": pluginVersion,
			"cmdPath":       fetched.Entrypoint,
			"timeout":       executionTimeoutInSeconds,
			"env":           env,
			"status":        status,
		}).WithError(err).Infof("CheckAndReportPlugin")
	} else if pluginType == PLUGIN_ONCE && resource == FetchFromOnline {
		if fetched.AddSysTag {
			sysTagType = AddSysTag
		} else if len(fetched.EnvPrePluginDir) != 0 {
			sysTagType = RemoveSysTag
		}
		pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, sysTagType)
	}
	return
}