internal/immediateruncommand/immediateruncommand.go (149 lines of code) (raw):
package immediateruncommand
import (
"fmt"
"math"
"time"
"github.com/Azure/run-command-handler-linux/internal/constants"
"github.com/Azure/run-command-handler-linux/internal/goalstate"
"github.com/Azure/run-command-handler-linux/internal/hostgacommunicator"
"github.com/Azure/run-command-handler-linux/internal/observer"
"github.com/Azure/run-command-handler-linux/internal/requesthelper"
"github.com/Azure/run-command-handler-linux/internal/settings"
"github.com/Azure/run-command-handler-linux/internal/status"
"github.com/Azure/run-command-handler-linux/internal/types"
"github.com/Azure/run-command-handler-linux/pkg/counterutil"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
const (
maxConcurrentTasks int32 = 5
)
var executingTasks counterutil.AtomicCount
// goalStateEventObserver is an observer that listens for status changes in goal states.
// Each goal state is identified by a unique key and has a notifier associated with it.
// The notifiers are used to send status back to the observer and the observer reports the status to the HGAP.
var goalStateEventObserver = status.StatusObserver{}
type VMSettingsRequestManager struct{}
func (*VMSettingsRequestManager) GetVMSettingsRequestManager(ctx *log.Context) (*requesthelper.RequestManager, error) {
return hostgacommunicator.GetVMSettingsRequestManager(ctx)
}
func StartImmediateRunCommand(ctx *log.Context) error {
ctx.Log("message", "starting immediate run command service")
var vmRequestManager = new(VMSettingsRequestManager)
var lastProcessedETag string = ""
communicator := hostgacommunicator.NewHostGACommunicator(vmRequestManager)
goalStateEventObserver.Initialize(ctx)
for {
ctx.Log("message", "processing new immediate run command goal states. Last processed ETag: "+lastProcessedETag)
newProcessedETag, err := processImmediateRunCommandGoalStates(ctx, communicator, lastProcessedETag)
if err != nil {
ctx.Log("error", errors.Wrapf(err, "could not process new immediate run command states because of an unexpected error"))
ctx.Log("message", "sleep for 5 seconds before retrying")
time.Sleep(time.Second * time.Duration(5))
} else {
lastProcessedETag = newProcessedETag
}
}
}
func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgacommunicator.HostGACommunicator, lastProcessedETag string) (string, error) {
maxTasksToFetch := int(math.Max(float64(maxConcurrentTasks-executingTasks.Get()), 0))
ctx.Log("message", fmt.Sprintf("concurrent tasks: %v out of max %v", executingTasks.Get(), maxConcurrentTasks))
if maxTasksToFetch == 0 {
ctx.Log("warning", "will not fetch new tasks in this iteration as we have reached maximum capacity...")
return lastProcessedETag, nil
}
goalStates, newEtag, err := goalstate.GetImmediateRunCommandGoalStates(ctx, &communicator, lastProcessedETag)
if err != nil {
return newEtag, errors.Wrapf(err, "could not retrieve goal states for immediate run command")
}
var goalStateKeys []types.GoalStateKey
for _, s := range goalStates {
for _, setting := range s.Settings {
goalStateKeys = append(goalStateKeys, types.GoalStateKey{ExtensionName: *setting.ExtensionName, SeqNumber: *setting.SeqNo})
}
}
goalStateEventObserver.RemoveProcessedGoalStates(goalStateKeys)
newGoalStates, skippedGoalStates, err := getGoalStatesToProcess(goalStates, maxTasksToFetch)
if err != nil {
return newEtag, errors.Wrap(err, "could not get goal states to process")
}
if len(newGoalStates) > 0 {
ctx.Log("message", fmt.Sprintf("trying to launch %v goal states concurrently", len(newGoalStates)))
for idx := range newGoalStates {
go func(state settings.SettingsCommon) {
ctx.Log("message", "launching new goal state. Incrementing executing tasks counter")
executingTasks.Increment()
ctx.Log("message", "adding goal state to the event map")
statusKey := types.GoalStateKey{ExtensionName: *state.ExtensionName, SeqNumber: *state.SeqNo}
defaultTopStatus := types.StatusItem{}
status := types.StatusEventArgs{TopLevelStatus: defaultTopStatus, StatusKey: statusKey}
notifier := &observer.Notifier{}
notifier.Register(&goalStateEventObserver)
notifier.Notify(status)
startTime := time.Now().UTC().Format(time.RFC3339)
exitCode, err := goalstate.HandleImmediateGoalState(ctx, state, notifier)
ctx.Log("message", "goal state has exited. Decrementing executing tasks counter")
executingTasks.Decrement()
// If there was an error executing the goal state, report the final status to the HGAP
// For successful goal states, the status is reported by the usual workflow
if err != nil {
ctx.Log("error", "failed to execute goal state", "message", err)
instView := types.RunCommandInstanceView{
ExecutionState: types.Failed,
ExecutionMessage: "Execution failed",
ExitCode: exitCode,
Output: "",
Error: err.Error(),
StartTime: startTime,
EndTime: time.Now().UTC().Format(time.RFC3339),
}
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
}
}(newGoalStates[idx])
}
ctx.Log("message", "finished launching goal states")
} else {
ctx.Log("message", "no new goal states were found in this iteration")
}
if len(skippedGoalStates) > 0 {
ctx.Log("message", fmt.Sprintf("skipped %v goal states due to reaching the maximum concurrent tasks", len(skippedGoalStates)))
for _, skippedGoalState := range skippedGoalStates {
statusKey := types.GoalStateKey{ExtensionName: *skippedGoalState.ExtensionName, SeqNumber: *skippedGoalState.SeqNo}
notifier := &observer.Notifier{}
notifier.Register(&goalStateEventObserver)
errorMsg := fmt.Sprintf("Exceeded concurrent goal state processing limit. Allowed new goal state count: %d. Extension: %s, SeqNumber: %d", maxTasksToFetch, *skippedGoalState.ExtensionName, *skippedGoalState.SeqNo)
instView := types.RunCommandInstanceView{
ExecutionState: types.Failed,
ExecutionMessage: "Execution was skipped due to reaching the maximum concurrent tasks",
ExitCode: constants.ExitCode_SkippedImmediateGoalState,
Output: "",
Error: errorMsg,
StartTime: time.Now().UTC().Format(time.RFC3339),
EndTime: time.Now().UTC().Format(time.RFC3339),
}
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
}
} else {
ctx.Log("message", "no goal states were skipped")
}
return newEtag, nil
}
// Get the goal states that have not been processed yet
func getGoalStatesToProcess(goalStates []hostgacommunicator.ImmediateExtensionGoalState, maxTasksToFetch int) ([]settings.SettingsCommon, []settings.SettingsCommon, error) {
var newGoalStates []settings.SettingsCommon
var skippedGoalStates []settings.SettingsCommon
for _, el := range goalStates {
validSignature, err := el.ValidateSignature()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to validate goal state signature")
}
if validSignature {
for _, s := range el.Settings {
statusKey := types.GoalStateKey{ExtensionName: *s.ExtensionName, SeqNumber: *s.SeqNo}
_, goalStateAlreadyProcessed := goalStateEventObserver.GetStatusForKey(statusKey)
if !goalStateAlreadyProcessed {
if len(newGoalStates) < maxTasksToFetch {
newGoalStates = append(newGoalStates, s)
} else {
skippedGoalStates = append(skippedGoalStates, s)
}
}
}
}
}
return newGoalStates, skippedGoalStates, nil
}