in internal/immediateruncommand/immediateruncommand.go [59:154]
func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgacommunicator.HostGACommunicator, lastProcessedETag string) (string, error) {
maxTasksToFetch := int(math.Max(float64(maxConcurrentTasks-executingTasks.Get()), 0))
ctx.Log("message", fmt.Sprintf("concurrent tasks: %v out of max %v", executingTasks.Get(), maxConcurrentTasks))
if maxTasksToFetch == 0 {
ctx.Log("warning", "will not fetch new tasks in this iteration as we have reached maximum capacity...")
return lastProcessedETag, nil
}
goalStates, newEtag, err := goalstate.GetImmediateRunCommandGoalStates(ctx, &communicator, lastProcessedETag)
if err != nil {
return newEtag, errors.Wrapf(err, "could not retrieve goal states for immediate run command")
}
var goalStateKeys []types.GoalStateKey
for _, s := range goalStates {
for _, setting := range s.Settings {
goalStateKeys = append(goalStateKeys, types.GoalStateKey{ExtensionName: *setting.ExtensionName, SeqNumber: *setting.SeqNo})
}
}
goalStateEventObserver.RemoveProcessedGoalStates(goalStateKeys)
newGoalStates, skippedGoalStates, err := getGoalStatesToProcess(goalStates, maxTasksToFetch)
if err != nil {
return newEtag, errors.Wrap(err, "could not get goal states to process")
}
if len(newGoalStates) > 0 {
ctx.Log("message", fmt.Sprintf("trying to launch %v goal states concurrently", len(newGoalStates)))
for idx := range newGoalStates {
go func(state settings.SettingsCommon) {
ctx.Log("message", "launching new goal state. Incrementing executing tasks counter")
executingTasks.Increment()
ctx.Log("message", "adding goal state to the event map")
statusKey := types.GoalStateKey{ExtensionName: *state.ExtensionName, SeqNumber: *state.SeqNo}
defaultTopStatus := types.StatusItem{}
status := types.StatusEventArgs{TopLevelStatus: defaultTopStatus, StatusKey: statusKey}
notifier := &observer.Notifier{}
notifier.Register(&goalStateEventObserver)
notifier.Notify(status)
startTime := time.Now().UTC().Format(time.RFC3339)
exitCode, err := goalstate.HandleImmediateGoalState(ctx, state, notifier)
ctx.Log("message", "goal state has exited. Decrementing executing tasks counter")
executingTasks.Decrement()
// If there was an error executing the goal state, report the final status to the HGAP
// For successful goal states, the status is reported by the usual workflow
if err != nil {
ctx.Log("error", "failed to execute goal state", "message", err)
instView := types.RunCommandInstanceView{
ExecutionState: types.Failed,
ExecutionMessage: "Execution failed",
ExitCode: exitCode,
Output: "",
Error: err.Error(),
StartTime: startTime,
EndTime: time.Now().UTC().Format(time.RFC3339),
}
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
}
}(newGoalStates[idx])
}
ctx.Log("message", "finished launching goal states")
} else {
ctx.Log("message", "no new goal states were found in this iteration")
}
if len(skippedGoalStates) > 0 {
ctx.Log("message", fmt.Sprintf("skipped %v goal states due to reaching the maximum concurrent tasks", len(skippedGoalStates)))
for _, skippedGoalState := range skippedGoalStates {
statusKey := types.GoalStateKey{ExtensionName: *skippedGoalState.ExtensionName, SeqNumber: *skippedGoalState.SeqNo}
notifier := &observer.Notifier{}
notifier.Register(&goalStateEventObserver)
errorMsg := fmt.Sprintf("Exceeded concurrent goal state processing limit. Allowed new goal state count: %d. Extension: %s, SeqNumber: %d", maxTasksToFetch, *skippedGoalState.ExtensionName, *skippedGoalState.SeqNo)
instView := types.RunCommandInstanceView{
ExecutionState: types.Failed,
ExecutionMessage: "Execution was skipped due to reaching the maximum concurrent tasks",
ExitCode: constants.ExitCode_SkippedImmediateGoalState,
Output: "",
Error: errorMsg,
StartTime: time.Now().UTC().Format(time.RFC3339),
EndTime: time.Now().UTC().Format(time.RFC3339),
}
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
}
} else {
ctx.Log("message", "no goal states were skipped")
}
return newEtag, nil
}