func RunPlugins()

in agent/framework/runpluginutil/runpluginutil.go [98:281]


func RunPlugins(
	context context.T,
	plugins []contracts.PluginState,
	ioConfig contracts.IOConfiguration,
	upstreamServiceName contracts.UpstreamServiceName,
	registry PluginRegistry,
	resChan chan contracts.PluginResult,
	cancelFlag task.CancelFlag,
) (pluginOutputs map[string]*contracts.PluginResult) {

	pluginOutputs = make(map[string]*contracts.PluginResult)

	//Contains the logStreamPrefix without the pluginID
	logStreamPrefix := ioConfig.CloudWatchConfig.LogStreamPrefix
	log := context.Log()

	defer func() {
		if r := recover(); r != nil {
			log.Errorf("Run plugins panic: \n%v", r)
			log.Errorf("Stacktrace:\n%s", debug.Stack())
		}
	}()

	for pluginIndex, pluginState := range plugins {
		pluginID := pluginState.Id     // the identifier of the plugin
		pluginName := pluginState.Name // the name of the plugin
		pluginOutput := pluginState.Result
		pluginOutput.PluginID = pluginID
		pluginOutput.PluginName = pluginName
		pluginOutputs[pluginID] = &pluginOutput
		log.Debugf("Checking Status for plugin %s - %s", pluginName, pluginOutput.Status)
		switch pluginOutput.Status {
		//TODO properly initialize the plugin status
		case "":
			log.Debugf("plugin - %v has empty state, initialize as NotStarted",
				pluginName)
			pluginOutput.StartDateTime = time.Now()
			pluginOutput.Status = contracts.ResultStatusNotStarted

		case contracts.ResultStatusNotStarted, contracts.ResultStatusInProgress:
			log.Debugf("plugin - %v status %v",
				pluginName,
				pluginOutput.Status)
			pluginOutput.StartDateTime = time.Now()

		case contracts.ResultStatusSuccessAndReboot:
			log.Debugf("plugin - %v just experienced reboot, reset to InProgress...",
				pluginName)
			pluginOutput.Status = contracts.ResultStatusInProgress
		case contracts.ResultStatusFailed:
			log.Debugf("plugin - %v already executed with failed status, skipping...",
				pluginName)
			resChan <- *pluginOutputs[pluginID]
			continue
		default:
			log.Debugf("plugin - %v already executed, skipping...",
				pluginName)
			continue
		}

		log.Debugf("Executing plugin - %v", pluginName)

		// populate plugin start time, status, and upstream service name
		configuration := pluginState.Configuration
		configuration.UpstreamServiceName = upstreamServiceName

		if ioConfig.OutputS3BucketName != "" {
			pluginOutputs[pluginID].OutputS3BucketName = ioConfig.OutputS3BucketName
			if ioConfig.OutputS3KeyPrefix != "" {
				pluginOutputs[pluginID].OutputS3KeyPrefix = fileutil.BuildS3Path(ioConfig.OutputS3KeyPrefix, pluginName)

			}
		}
		//Append pluginID to logStreamPrefix. Replace ':' or '*' with '-' since LogStreamNames cannot have those characters
		if ioConfig.CloudWatchConfig.LogGroupName != "" {
			ioConfig.CloudWatchConfig.LogStreamPrefix = fmt.Sprintf("%s/%s", logStreamPrefix, pluginID)
			ioConfig.CloudWatchConfig.LogStreamPrefix = strings.Replace(ioConfig.CloudWatchConfig.LogStreamPrefix, ":", "-", -1)
			ioConfig.CloudWatchConfig.LogStreamPrefix = strings.Replace(ioConfig.CloudWatchConfig.LogStreamPrefix, "*", "-", -1)
		}

		var (
			r                  contracts.PluginResult
			pluginFactory      PluginFactory
			pluginHandlerFound bool
			isKnown            bool
			isSupported        bool
		)

		pluginFactory, pluginHandlerFound = registry[pluginName]
		isKnown, isSupported, _ = isSupportedPlugin(log, pluginName)
		// checking if a prior step returned exit codes 168 or 169 to exit document.
		// If so we need to skip every other step
		shouldSkipStepDueToPriorFailedStep := getShouldPluginSkipBasedOnControlFlow(
			context,
			plugins,
			pluginIndex,
			pluginOutputs,
		)

		operation, logMessage := getStepExecutionOperation(
			log,
			pluginName,
			pluginID,
			isKnown,
			isSupported,
			pluginHandlerFound,
			configuration.IsPreconditionEnabled,
			configuration.Preconditions,
			shouldSkipStepDueToPriorFailedStep)

		switch operation {
		case executeStep:
			log.Infof("Running plugin %s %s", pluginName, pluginID)
			r = runPlugin(context, pluginFactory, pluginName, configuration, cancelFlag, ioConfig)
			pluginOutputs[pluginID].Code = r.Code
			pluginOutputs[pluginID].Status = r.Status
			pluginOutputs[pluginID].Error = r.Error
			pluginOutputs[pluginID].StandardError = r.StandardError
			pluginOutputs[pluginID].StandardOutput = r.StandardOutput
			pluginOutputs[pluginID].Output = r.Output
			pluginOutputs[pluginID].StepName = r.StepName

			onFailureProp := getStringPropByName(pluginState.Configuration.Properties, contracts.OnFailureModifier)
			hasOnFailureProp := onFailureProp == contracts.ModifierValueExit || onFailureProp == contracts.ModifierValueSuccessAndExit
			outputAddition := ""
			if pluginOutputs[pluginID].Code == contracts.ExitWithSuccess {
				outputAddition = "\nStep exited with code 168. Therefore, marking step as succeeded. Further document steps will be skipped."
				pluginOutputs[pluginID].Status = contracts.ResultStatusSuccess
				pluginOutputs[pluginID].Error = ""
				pluginOutputs[pluginID].StandardError = ""
				pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
			} else if pluginOutputs[pluginID].Code == contracts.ExitWithFailure {
				outputAddition = "\nStep exited with code 169. Therefore, marking step as Failed. Further document steps will be skipped."
				pluginOutputs[pluginID].StandardError = r.StandardError + outputAddition
				pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
			} else if pluginOutputs[pluginID].Status == contracts.ResultStatusFailed && hasOnFailureProp {
				outputAddition = "\nStep was found to have onFailure property. Further document steps will be skipped."
				pluginOutputs[pluginID].StandardError = r.StandardError + outputAddition
				pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
				if onFailureProp == contracts.ModifierValueSuccessAndExit {
					pluginOutputs[pluginID].Status = contracts.ResultStatusSuccess
					pluginOutputs[pluginID].Code = contracts.ExitWithSuccess
				}
			}

		case skipStep:
			log.Info(logMessage)
			pluginOutputs[pluginID].Status = contracts.ResultStatusSkipped
			pluginOutputs[pluginID].Code = 0
			pluginOutputs[pluginID].Output = logMessage
		case failStep:
			err := fmt.Errorf(logMessage)
			pluginOutputs[pluginID].Status = contracts.ResultStatusFailed
			pluginOutputs[pluginID].Error = err.Error()
			log.Error(err)
		default:
			err := fmt.Errorf("Unknown error, Operation: %s, Plugin name: %s", operation, pluginName)
			pluginOutputs[pluginID].Status = contracts.ResultStatusFailed
			pluginOutputs[pluginID].Error = err.Error()
			log.Error(err)
		}

		// set end time.
		pluginOutputs[pluginID].EndDateTime = time.Now()
		log.Infof("Sending plugin %v completion message", pluginID)

		// truncate the result and send it back to buffer channel.
		result := *pluginOutputs[pluginID]
		pluginConfig := iohandler.DefaultOutputConfig()
		result.StandardOutput = pluginutil.StringPrefix(result.StandardOutput, pluginConfig.MaxStdoutLength, pluginConfig.OutputTruncatedSuffix)
		result.StandardError = pluginutil.StringPrefix(result.StandardError, pluginConfig.MaxStdoutLength, pluginConfig.OutputTruncatedSuffix)
		// send to buffer channel, guaranteed to not block since buffer size is plugin number
		resChan <- result

		//TODO handle cancelFlag here
		if pluginHandlerFound && r.Status == contracts.ResultStatusSuccessAndReboot {
			// do not execute the the next plugin
			break
		}
	}
	// this will clean the orchestration folder for the successful and failed document executions only when the agent is configured
	orchestrationDirCleanup(context, len(plugins), pluginOutputs, ioConfig.OrchestrationDirectory)
	return
}