func processImmediateRunCommandGoalStates()

in internal/immediateruncommand/immediateruncommand.go [59:154]


func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgacommunicator.HostGACommunicator, lastProcessedETag string) (string, error) {
	maxTasksToFetch := int(math.Max(float64(maxConcurrentTasks-executingTasks.Get()), 0))
	ctx.Log("message", fmt.Sprintf("concurrent tasks: %v out of max %v", executingTasks.Get(), maxConcurrentTasks))
	if maxTasksToFetch == 0 {
		ctx.Log("warning", "will not fetch new tasks in this iteration as we have reached maximum capacity...")
		return lastProcessedETag, nil
	}

	goalStates, newEtag, err := goalstate.GetImmediateRunCommandGoalStates(ctx, &communicator, lastProcessedETag)
	if err != nil {
		return newEtag, errors.Wrapf(err, "could not retrieve goal states for immediate run command")
	}

	var goalStateKeys []types.GoalStateKey
	for _, s := range goalStates {
		for _, setting := range s.Settings {
			goalStateKeys = append(goalStateKeys, types.GoalStateKey{ExtensionName: *setting.ExtensionName, SeqNumber: *setting.SeqNo})
		}
	}
	goalStateEventObserver.RemoveProcessedGoalStates(goalStateKeys)
	newGoalStates, skippedGoalStates, err := getGoalStatesToProcess(goalStates, maxTasksToFetch)
	if err != nil {
		return newEtag, errors.Wrap(err, "could not get goal states to process")
	}

	if len(newGoalStates) > 0 {
		ctx.Log("message", fmt.Sprintf("trying to launch %v goal states concurrently", len(newGoalStates)))

		for idx := range newGoalStates {
			go func(state settings.SettingsCommon) {
				ctx.Log("message", "launching new goal state. Incrementing executing tasks counter")
				executingTasks.Increment()

				ctx.Log("message", "adding goal state to the event map")
				statusKey := types.GoalStateKey{ExtensionName: *state.ExtensionName, SeqNumber: *state.SeqNo}
				defaultTopStatus := types.StatusItem{}
				status := types.StatusEventArgs{TopLevelStatus: defaultTopStatus, StatusKey: statusKey}

				notifier := &observer.Notifier{}
				notifier.Register(&goalStateEventObserver)
				notifier.Notify(status)
				startTime := time.Now().UTC().Format(time.RFC3339)
				exitCode, err := goalstate.HandleImmediateGoalState(ctx, state, notifier)

				ctx.Log("message", "goal state has exited. Decrementing executing tasks counter")
				executingTasks.Decrement()

				// If there was an error executing the goal state, report the final status to the HGAP
				// For successful goal states, the status is reported by the usual workflow
				if err != nil {
					ctx.Log("error", "failed to execute goal state", "message", err)
					instView := types.RunCommandInstanceView{
						ExecutionState:   types.Failed,
						ExecutionMessage: "Execution failed",
						ExitCode:         exitCode,
						Output:           "",
						Error:            err.Error(),
						StartTime:        startTime,
						EndTime:          time.Now().UTC().Format(time.RFC3339),
					}
					goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)

				}
			}(newGoalStates[idx])
		}

		ctx.Log("message", "finished launching goal states")
	} else {
		ctx.Log("message", "no new goal states were found in this iteration")
	}

	if len(skippedGoalStates) > 0 {
		ctx.Log("message", fmt.Sprintf("skipped %v goal states due to reaching the maximum concurrent tasks", len(skippedGoalStates)))
		for _, skippedGoalState := range skippedGoalStates {
			statusKey := types.GoalStateKey{ExtensionName: *skippedGoalState.ExtensionName, SeqNumber: *skippedGoalState.SeqNo}
			notifier := &observer.Notifier{}
			notifier.Register(&goalStateEventObserver)

			errorMsg := fmt.Sprintf("Exceeded concurrent goal state processing limit. Allowed new goal state count: %d. Extension: %s, SeqNumber: %d", maxTasksToFetch, *skippedGoalState.ExtensionName, *skippedGoalState.SeqNo)
			instView := types.RunCommandInstanceView{
				ExecutionState:   types.Failed,
				ExecutionMessage: "Execution was skipped due to reaching the maximum concurrent tasks",
				ExitCode:         constants.ExitCode_SkippedImmediateGoalState,
				Output:           "",
				Error:            errorMsg,
				StartTime:        time.Now().UTC().Format(time.RFC3339),
				EndTime:          time.Now().UTC().Format(time.RFC3339),
			}
			goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
		}
	} else {
		ctx.Log("message", "no goal states were skipped")
	}

	return newEtag, nil
}