operationsbus/operation_request.go (95 lines of code) (raw):

package operationsbus import ( "context" "encoding/json" "errors" "fmt" "reflect" sb "github.com/Azure/aks-async/servicebus" "github.com/Azure/aks-middleware/grpc/server/ctxlogger" "google.golang.org/protobuf/types/known/timestamppb" ) // All the fields that the operations might need. This struct will be part of every operation. type OperationRequest struct { OperationName string // Name of the operation being processed. Used to match the ApiOperation with the right implementation. APIVersion string // Specifies the version of the API the operation is associated with, ensuring compatibility. RetryCount int // Tracks the number of retries of the operation to prevent infinite looping or special logic around retries. OperationId string // A unique identifier for the operation. EntityId string // A unique identifier for the entity (resource) the operation is acting on, used with EntityType to ensure we have selected the right entity. EntityType string // Specified the type of entity the operation is acting on, used with EntityId to ensure we have selected the right Entity. ExpirationTimestamp *timestamppb.Timestamp // Defines when the operation should expire and prevent execution, should it have passed this date. // HTTP Body []byte // Contains request payload or data needed for the operation in HTTP operations. HttpMethod string // Indicated the GGPT method if the operation requires HTTP-based communication. Extension interface{} // An optional and flexible field to add any data the user may require. } func NewOperationRequest( operationName string, apiVersion string, operationId string, entityId string, entityType string, retryCount int, expirationTimestamp *timestamppb.Timestamp, body []byte, httpMethod string, extension interface{}, ) *OperationRequest { return &OperationRequest{ OperationName: operationName, APIVersion: apiVersion, RetryCount: retryCount, OperationId: operationId, EntityId: entityId, EntityType: entityType, ExpirationTimestamp: expirationTimestamp, Body: body, HttpMethod: httpMethod, Extension: extension, } } // Generalized method to retry every operation. If the operation failed or hit an error at any stage, this method will be called after the panic is handled. func (opRequest *OperationRequest) Retry(ctx context.Context, sender sb.SenderInterface) error { logger := ctxlogger.GetLogger(ctx) logger.Info("Retrying the long running operation.") logger.Info(fmt.Sprintf("Struct: %+v", opRequest)) opRequest.RetryCount++ logger.Info(fmt.Sprintf("Current retry: %d", opRequest.RetryCount)) marshalledOperation, err := json.Marshal(opRequest) if err != nil { logger.Error("Error marshalling operation: " + err.Error()) return err } logger.Info("Sending message to Service Bus") err = sender.SendMessage(ctx, []byte(marshalledOperation)) if err != nil { logger.Error("Something happened: " + err.Error()) return err } return nil } // SetExtension sets the Extension field to a new type and value, copying data if possible func (opRequest *OperationRequest) SetExtension(newValue interface{}) error { newType := reflect.TypeOf(newValue) if newType == nil { return errors.New("new value is nil") } // Create a new instance of the type newInstance := reflect.New(newType).Elem() if opRequest.Extension != nil { oldValue := reflect.ValueOf(opRequest.Extension) if oldValue.Kind() == reflect.Ptr { oldValue = oldValue.Elem() } if oldValue.Type().AssignableTo(newType) { newInstance.Set(oldValue) } else { // Handle conversion based on known types or provide a custom conversion data, err := json.Marshal(opRequest.Extension) if err != nil { return err } if err := json.Unmarshal(data, newInstance.Addr().Interface()); err != nil { return err } } } else { // Initialize with zero values if Extension is nil newInstance.Set(reflect.Zero(newType)) } // opRequest.ExtensionType = newType opRequest.Extension = newInstance.Interface() return nil }