internal/status/immediatestatus.go (105 lines of code) (raw):
package status
import (
"encoding/json"
"fmt"
"slices"
"sync"
"github.com/Azure/run-command-handler-linux/internal/hostgacommunicator"
"github.com/Azure/run-command-handler-linux/internal/types"
"github.com/Azure/run-command-handler-linux/pkg/statusreporter"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
// This is the serializable data contract for VM Aggregate Immediate Status in CRP
type ImmediateTopLevelStatus struct {
AggregateHandlerImmediateStatus []ImmediateHandlerStatus `json:"aggregateHandlerImmediateStatus" validate:"required"`
}
// Status of the handler that is capable of handling immediate goal states
type ImmediateHandlerStatus struct {
HandlerName string `json:"handlerName" validate:"required"`
AggregateImmediateStatus []ImmediateStatus `json:"aggregateImmediateStatus" validate:"required"`
}
// Status of an immediate extension processed by a given handler
type ImmediateStatus struct {
SequenceNumber int `json:"sequenceNumber" validate:"required"`
TimestampUTC string `json:"timestampUTC" validate:"required"`
Status types.StatusItem `json:"status" validate:"required"`
}
// Observer defines a type that can receive notifications from a Notifier.
// Must implement the Observer interface.
type StatusObserver struct {
// goalStateEventMap is a map that stores the goal state key and the status item
// sync.Map is preferred over map because it is safe for concurrent use
goalStateEventMap sync.Map
// ctx is the logger context
ctx *log.Context
// Reporter is the status Reporter
Reporter statusreporter.IGuestInformationServiceClient
}
func (o *StatusObserver) Initialize(ctx *log.Context) {
o.goalStateEventMap = sync.Map{}
o.ctx = ctx
o.Reporter = statusreporter.NewGuestInformationServiceClient(hostgacommunicator.WireServerFallbackAddress)
}
func (o *StatusObserver) OnNotify(status types.StatusEventArgs) error {
o.ctx.Log("message", fmt.Sprintf("Processing status event for goal state with key %v", status.StatusKey))
o.goalStateEventMap.Store(status.StatusKey, status.TopLevelStatus)
return o.reportImmediateStatus(o.getImmediateTopLevelStatusToReport())
}
func (o *StatusObserver) getImmediateTopLevelStatusToReport() ImmediateTopLevelStatus {
o.ctx.Log("message", "Getting all goal states from the event map with the latest status that are not empty")
latestStatusToReport := []ImmediateStatus{}
o.goalStateEventMap.Range(func(key, value interface{}) bool {
// Only report the latest active status for each goal state
if value.(types.StatusItem) != (types.StatusItem{}) {
statusItem := value.(types.StatusItem)
immediateStatus := ImmediateStatus{
SequenceNumber: key.(types.GoalStateKey).SeqNumber,
TimestampUTC: statusItem.TimestampUTC,
Status: statusItem,
}
latestStatusToReport = append(latestStatusToReport, immediateStatus)
}
return true
})
o.ctx.Log("message", "Creating immediate status to report")
return ImmediateTopLevelStatus{
AggregateHandlerImmediateStatus: []ImmediateHandlerStatus{
{
HandlerName: "RunCommandHandler",
AggregateImmediateStatus: latestStatusToReport,
},
},
}
}
func (o *StatusObserver) reportImmediateStatus(immediateStatus ImmediateTopLevelStatus) error {
o.ctx.Log("message", "Marshalling immediate status into json")
rootStatusJson, err := json.Marshal(immediateStatus)
if err != nil {
return fmt.Errorf("status: failed to marshal immediate status into json: %v", err)
}
o.ctx.Log("message", "create request to upload status to: "+o.Reporter.GetPutStatusUri())
response, err := o.Reporter.ReportStatus(string(rootStatusJson))
o.ctx.Log("message", fmt.Sprintf("Status received from request to %v: %v", response.Request.URL, response.Status))
if err != nil {
return errors.Wrap(err, "failed to report status to HGAP")
}
if response.StatusCode != 200 {
return errors.New("failed to report status with error code " + response.Status)
}
return nil
}
// Remove the goal states that have already been processed from the event map
// If the goal state that was added before is not in the new list of goal states, it should be removed
// This is to ensure that the event map only contains the goal states that are currently being processed
func (o *StatusObserver) RemoveProcessedGoalStates(goalStateKeys []types.GoalStateKey) {
// TODO: Eventually we'll need to report also already processed goal states to the HGAP even if they are not in the new list.
// TODO: When a command is sent down with toBeDeleted = true, then we should remove it and no longer report on it.
o.goalStateEventMap.Range(func(key, value interface{}) bool {
if !slices.Contains(goalStateKeys, key.(types.GoalStateKey)) {
o.ctx.Log("message", "removing goal state from the event map", "key", key)
o.goalStateEventMap.Delete(key)
}
return true // continue iterating
})
}
func (o *StatusObserver) GetStatusForKey(key types.GoalStateKey) (types.StatusItem, bool) {
data, ok := o.goalStateEventMap.Load(key)
if ok {
return data.(types.StatusItem), true
}
return types.StatusItem{}, false
}
func (o *StatusObserver) getStatusForAllKeys() []types.StatusItem {
statusItems := []types.StatusItem{}
o.goalStateEventMap.Range(func(key, value interface{}) bool {
statusItems = append(statusItems, value.(types.StatusItem))
return true
})
return statusItems
}