func()

in agent/pluginmanager/acspluginmanager/acspluginmanager.go [325:531]


func (pm *PluginManager) executePluginFromFile(packagePath string, fetchTimeoutInSeconds int, executeParams *ExecuteParams) (exitCode int, err error) {
	const funcName = "ExecutePluginFromFile"
	log.GetLogger().Infof("Enter executePluginFromFile")
	var (
		// variables for metrics
		pluginName    string
		pluginVersion string
		resource      string = FetchFromLocalFile
		errorCode     string
		localArch     string
		pluginType    string

		// 执行插件时要注入的环境变量
		envPrePluginDir string // 如果已有同名的其他版本插件,表示原有同名插件的执行目录;否则为空
	)
	defer func() {
		metrics.GetPluginExecuteEvent(
			"pluginName", pluginName,
			"pluginVersion", pluginVersion,
			"pluginType", pluginType,
			"resource", resource, // 插件来源,文件、本地已安装的、线上拉取的
			"exitCode", fmt.Sprint(exitCode),
			"errorCode", errorCode, // 错误码,plugin-manager定义的错误码,例如 PACKAGE_NOT_FOUND,如果这个字段非空表示插件没有正确执行,exitCode是plugin-manager定义的退出码;否则表示插件被正确调用,exitCode是插件执行的退出码
			"localArch", localArch,
			"localOsType", osutil.GetOsType(),
		).ReportEventSync()
	}()
	if pm.Verbose {
		fmt.Println("Execute plugin from file: ", packagePath)
	}
	localArch, _ = GetArch()
	exitCode = SUCCESS
	if !fileutil.CheckFileIsExist(packagePath) {
		err = errors.New("File not exist: " + packagePath)
		exitCode, errorCode = errProcess(funcName, PACKAGE_NOT_FOUND, err, "Package file not exist: "+packagePath)
		return
	}

	const compressedConfigPath = "config.json"
	var configBody []byte
	configBody, err = zipfile.PeekFile(packagePath, compressedConfigPath)
	if err != nil {
		if errors.Is(err, zip.ErrFormat) {
			errorMessage := "Package file isn`t a zip file: " + packagePath
			err = errors.New(errorMessage)
			exitCode, errorCode = errProcess(funcName, PACKAGE_FORMAT_ERR, err, errorMessage)
			return
		} else if errors.Is(err, os.ErrNotExist) {
			errorMessage := fmt.Sprintf("Manifest file config.json does not exist in %s.", packagePath)
			err = errors.New(errorMessage)
			exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, errorMessage)
			return
		} else {
			exitCode, errorCode = errProcess(funcName, UNZIP_ERR, err, fmt.Sprintf("Unzip err, file is [%s], target file is [%s], err is [%s]", packagePath, compressedConfigPath, err.Error()))
			return
		}
	}

	configContent := string(configBody)
	config := pluginConfig{}
	if err = fuzzyjson.Unmarshal(configContent, &config); err != nil {
		exitCode, errorCode = errProcess(funcName, UNMARSHAL_ERR, err, fmt.Sprintf("Unmarshal config.json err, config.json is [%s], err is [%s]", configContent, err.Error()))
		return
	}

	pluginName = config.Name
	pluginVersion = config.Version
	// 检查系统类型和架构是否符合
	if config.OsType != "" && strings.ToLower(config.OsType) != osutil.GetOsType() {
		err = errors.New("Plugin ostype not suit for this system")
		exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin ostype[%s] not suit for this system[%s]", config.OsType, osutil.GetOsType()))
		return
	}
	if config.Arch != "" && strings.ToLower(config.Arch) != "all" && strings.ToLower(config.Arch) != localArch {
		err = errors.New("Plugin arch not suit for this system")
		exitCode, errorCode = errProcess(funcName, PLUGIN_FORMAT_ERR, err, fmt.Sprintf("Plugin arch[%s] not suit for this system[%s]", config.Arch, localArch))
		return
	}

	pluginIndex, plugin, err := getInstalledPluginByName(config.Name)
	if err != nil {
		exitCode, errorCode = errProcess(funcName, LOAD_INSTALLEDPLUGINS_ERR, err, "Load installed_plugins err: "+err.Error())
		return
	}
	if plugin != nil && plugin.IsRemoved {
		// 之前的同名插件已经被删除,相当于重新安装
		if err = deleteInstalledPluginByIndex(pluginIndex); err != nil {
			exitCode, errorCode = errProcess(funcName, DUMP_INSTALLEDPLUGINS_ERR, err, "Update installed_plugins file err: "+err.Error())
			return
		}

		pluginIndex = -1
		plugin = nil
	}
	if plugin != nil {
		envPrePluginDir = filepath.Join(pm.pluginRoot, plugin.Name, plugin.Version)
		// has installed, check version
		if versionutil.CompareVersion(config.Version, plugin.Version) <= 0 {
			if !pm.Yes {
				yn := ""
				for {
					fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install ? [y/n]: \n", plugin.Name, plugin.Version, config.Version)
					fmt.Scanln(&yn)
					if yn == "y" || yn == "n" {
						break
					}
				}
				if yn == "n" {
					log.GetLogger().Infoln("Execute plugin cancel...")
					fmt.Println("Execute plugin cancel...")
					return
				}
			}
			fmt.Printf("[%s %s] has installed, this package version[%s] is not newer, still install...\n", plugin.Name, plugin.Version, config.Version)
		} else {
			fmt.Printf("[%s %s] has installed, this package version[%s] is newer, keep install...\n", plugin.Name, plugin.Version, config.Version)
		}
	}

	// Acquire plugin-wise shared lock
	var pluginLockFile *os.File
	pluginLockFile, err = openPluginLockFile(config.Name)
	if err != nil {
		err = NewOpenPluginLockFileError(err)
		exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
		return
	}
	defer pluginLockFile.Close()
	if err = filelock.TryRLock(pluginLockFile); err != nil {
		err = NewAcquirePluginExclusiveLockError(err)
		exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
		return
	}
	defer filelock.Unlock(pluginLockFile)

	var fetched *Fetched
	var exitingErr ExitingError
	fetchTimeout := time.Duration(fetchTimeoutInSeconds) * time.Second
	guardErr := InstallLockGuard{
		PluginName:    config.Name,
		PluginVersion: config.Version,

		AcquireSharedLockTimeout: fetchTimeout,
	}.Do(func() {
		// Double-check if specified plugin has been installed by another
		// plugin-manager process, that releases plugin-version-wise
		// exclusive lock and then this acquired.
		fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version)
		if exitingErr == nil || !errors.Is(exitingErr, ErrPackageNotFound) {
			return
		}

		fetched, exitingErr = pm.installFromFile(packagePath, &config, plugin, pluginIndex, fetchTimeout)
	}, func() {
		fetched, exitingErr = pm.queryFromLocalOnly(config.Name, config.Version)
		// TODO-FIXME: envPrePluginDir SHOULD be constructed on demand only
		fetched.EnvPrePluginDir = envPrePluginDir
	})
	if guardErr != nil {
		err = guardErr
		exitCode, errorCode = errProcess(funcName, LOCKING_ERR, err, fmt.Sprintf("Failed to execute plugin[%s %s]: %s", config.Name, config.Version, err.Error()))
		return
	}
	if exitingErr != nil {
		err = exitingErr.Unwrap()
		exitCode, errorCode = errProcess(funcName, exitingErr.ExitCode(), exitingErr.Unwrap(), exitingErr.Error())
		return
	}
	fmt.Printf("Plugin[%s] installed!\n", fetched.PluginName)

	pluginName = fetched.PluginName
	pluginVersion = fetched.PluginVersion
	pluginType = fetched.PluginType

	executionTimeoutInSeconds := fetched.ExecutionTimeoutInSeconds
	if executeParams.OptionalExecutionTimeoutInSeconds != nil {
		executionTimeoutInSeconds = *executeParams.OptionalExecutionTimeoutInSeconds
	}

	args := executeParams.SplitArgs()
	env := []string{
		"PLUGIN_DIR=" + fetched.EnvPluginDir,
		"PRE_PLUGIN_DIR=" + fetched.EnvPrePluginDir,
	}
	var options []process.CmdOption
	options, err = prepareCmdOptions(executeParams)
	if err != nil {
		exitCode, errorCode = errProcess(funcName, EXECUTE_FAILED, err, fmt.Sprintf("Failed to set execution options for plugin[%s %s]: %s", pluginName, pluginVersion, err.Error()))
		return
	}
	exitCode, errorCode, err = pm.executePlugin(fetched.Entrypoint, args, executionTimeoutInSeconds, env, false, options...)
	// 常驻插件和一次性插件:需要主动上报一次插件状态
	if pluginType == PLUGIN_PERSIST {
		status, err := pm.CheckAndReportPlugin(pluginName, pluginVersion, fetched.Entrypoint, executionTimeoutInSeconds, env, "")
		log.GetLogger().WithFields(log.Fields{
			"pluginName":    pluginName,
			"pluginVersion": pluginVersion,
			"cmdPath":       fetched.Entrypoint,
			"timeout":       executionTimeoutInSeconds,
			"env":           env,
			"status":        status,
		}).WithError(err).Infof("CheckAndReportPlugin")
	} else if pluginType == PLUGIN_ONCE {
		pm.ReportPluginStatus(pluginName, pluginVersion, ONCE_INSTALLED, "")
	}
	return
}