in agent/framework/runpluginutil/runpluginutil.go [98:281]
func RunPlugins(
context context.T,
plugins []contracts.PluginState,
ioConfig contracts.IOConfiguration,
upstreamServiceName contracts.UpstreamServiceName,
registry PluginRegistry,
resChan chan contracts.PluginResult,
cancelFlag task.CancelFlag,
) (pluginOutputs map[string]*contracts.PluginResult) {
pluginOutputs = make(map[string]*contracts.PluginResult)
//Contains the logStreamPrefix without the pluginID
logStreamPrefix := ioConfig.CloudWatchConfig.LogStreamPrefix
log := context.Log()
defer func() {
if r := recover(); r != nil {
log.Errorf("Run plugins panic: \n%v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
for pluginIndex, pluginState := range plugins {
pluginID := pluginState.Id // the identifier of the plugin
pluginName := pluginState.Name // the name of the plugin
pluginOutput := pluginState.Result
pluginOutput.PluginID = pluginID
pluginOutput.PluginName = pluginName
pluginOutputs[pluginID] = &pluginOutput
log.Debugf("Checking Status for plugin %s - %s", pluginName, pluginOutput.Status)
switch pluginOutput.Status {
//TODO properly initialize the plugin status
case "":
log.Debugf("plugin - %v has empty state, initialize as NotStarted",
pluginName)
pluginOutput.StartDateTime = time.Now()
pluginOutput.Status = contracts.ResultStatusNotStarted
case contracts.ResultStatusNotStarted, contracts.ResultStatusInProgress:
log.Debugf("plugin - %v status %v",
pluginName,
pluginOutput.Status)
pluginOutput.StartDateTime = time.Now()
case contracts.ResultStatusSuccessAndReboot:
log.Debugf("plugin - %v just experienced reboot, reset to InProgress...",
pluginName)
pluginOutput.Status = contracts.ResultStatusInProgress
case contracts.ResultStatusFailed:
log.Debugf("plugin - %v already executed with failed status, skipping...",
pluginName)
resChan <- *pluginOutputs[pluginID]
continue
default:
log.Debugf("plugin - %v already executed, skipping...",
pluginName)
continue
}
log.Debugf("Executing plugin - %v", pluginName)
// populate plugin start time, status, and upstream service name
configuration := pluginState.Configuration
configuration.UpstreamServiceName = upstreamServiceName
if ioConfig.OutputS3BucketName != "" {
pluginOutputs[pluginID].OutputS3BucketName = ioConfig.OutputS3BucketName
if ioConfig.OutputS3KeyPrefix != "" {
pluginOutputs[pluginID].OutputS3KeyPrefix = fileutil.BuildS3Path(ioConfig.OutputS3KeyPrefix, pluginName)
}
}
//Append pluginID to logStreamPrefix. Replace ':' or '*' with '-' since LogStreamNames cannot have those characters
if ioConfig.CloudWatchConfig.LogGroupName != "" {
ioConfig.CloudWatchConfig.LogStreamPrefix = fmt.Sprintf("%s/%s", logStreamPrefix, pluginID)
ioConfig.CloudWatchConfig.LogStreamPrefix = strings.Replace(ioConfig.CloudWatchConfig.LogStreamPrefix, ":", "-", -1)
ioConfig.CloudWatchConfig.LogStreamPrefix = strings.Replace(ioConfig.CloudWatchConfig.LogStreamPrefix, "*", "-", -1)
}
var (
r contracts.PluginResult
pluginFactory PluginFactory
pluginHandlerFound bool
isKnown bool
isSupported bool
)
pluginFactory, pluginHandlerFound = registry[pluginName]
isKnown, isSupported, _ = isSupportedPlugin(log, pluginName)
// checking if a prior step returned exit codes 168 or 169 to exit document.
// If so we need to skip every other step
shouldSkipStepDueToPriorFailedStep := getShouldPluginSkipBasedOnControlFlow(
context,
plugins,
pluginIndex,
pluginOutputs,
)
operation, logMessage := getStepExecutionOperation(
log,
pluginName,
pluginID,
isKnown,
isSupported,
pluginHandlerFound,
configuration.IsPreconditionEnabled,
configuration.Preconditions,
shouldSkipStepDueToPriorFailedStep)
switch operation {
case executeStep:
log.Infof("Running plugin %s %s", pluginName, pluginID)
r = runPlugin(context, pluginFactory, pluginName, configuration, cancelFlag, ioConfig)
pluginOutputs[pluginID].Code = r.Code
pluginOutputs[pluginID].Status = r.Status
pluginOutputs[pluginID].Error = r.Error
pluginOutputs[pluginID].StandardError = r.StandardError
pluginOutputs[pluginID].StandardOutput = r.StandardOutput
pluginOutputs[pluginID].Output = r.Output
pluginOutputs[pluginID].StepName = r.StepName
onFailureProp := getStringPropByName(pluginState.Configuration.Properties, contracts.OnFailureModifier)
hasOnFailureProp := onFailureProp == contracts.ModifierValueExit || onFailureProp == contracts.ModifierValueSuccessAndExit
outputAddition := ""
if pluginOutputs[pluginID].Code == contracts.ExitWithSuccess {
outputAddition = "\nStep exited with code 168. Therefore, marking step as succeeded. Further document steps will be skipped."
pluginOutputs[pluginID].Status = contracts.ResultStatusSuccess
pluginOutputs[pluginID].Error = ""
pluginOutputs[pluginID].StandardError = ""
pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
} else if pluginOutputs[pluginID].Code == contracts.ExitWithFailure {
outputAddition = "\nStep exited with code 169. Therefore, marking step as Failed. Further document steps will be skipped."
pluginOutputs[pluginID].StandardError = r.StandardError + outputAddition
pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
} else if pluginOutputs[pluginID].Status == contracts.ResultStatusFailed && hasOnFailureProp {
outputAddition = "\nStep was found to have onFailure property. Further document steps will be skipped."
pluginOutputs[pluginID].StandardError = r.StandardError + outputAddition
pluginOutputs[pluginID].StandardOutput = r.StandardOutput + outputAddition
if onFailureProp == contracts.ModifierValueSuccessAndExit {
pluginOutputs[pluginID].Status = contracts.ResultStatusSuccess
pluginOutputs[pluginID].Code = contracts.ExitWithSuccess
}
}
case skipStep:
log.Info(logMessage)
pluginOutputs[pluginID].Status = contracts.ResultStatusSkipped
pluginOutputs[pluginID].Code = 0
pluginOutputs[pluginID].Output = logMessage
case failStep:
err := fmt.Errorf(logMessage)
pluginOutputs[pluginID].Status = contracts.ResultStatusFailed
pluginOutputs[pluginID].Error = err.Error()
log.Error(err)
default:
err := fmt.Errorf("Unknown error, Operation: %s, Plugin name: %s", operation, pluginName)
pluginOutputs[pluginID].Status = contracts.ResultStatusFailed
pluginOutputs[pluginID].Error = err.Error()
log.Error(err)
}
// set end time.
pluginOutputs[pluginID].EndDateTime = time.Now()
log.Infof("Sending plugin %v completion message", pluginID)
// truncate the result and send it back to buffer channel.
result := *pluginOutputs[pluginID]
pluginConfig := iohandler.DefaultOutputConfig()
result.StandardOutput = pluginutil.StringPrefix(result.StandardOutput, pluginConfig.MaxStdoutLength, pluginConfig.OutputTruncatedSuffix)
result.StandardError = pluginutil.StringPrefix(result.StandardError, pluginConfig.MaxStdoutLength, pluginConfig.OutputTruncatedSuffix)
// send to buffer channel, guaranteed to not block since buffer size is plugin number
resChan <- result
//TODO handle cancelFlag here
if pluginHandlerFound && r.Status == contracts.ResultStatusSuccessAndReboot {
// do not execute the the next plugin
break
}
}
// this will clean the orchestration folder for the successful and failed document executions only when the agent is configured
orchestrationDirCleanup(context, len(plugins), pluginOutputs, ioConfig.OrchestrationDirectory)
return
}