operationsbus/handlers.go (328 lines of code) (raw):
package operationsbus
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
oc "github.com/Azure/OperationContainer/api/v1"
sb "github.com/Azure/aks-async/servicebus"
"github.com/Azure/aks-middleware/grpc/server/ctxlogger"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-shuttle/v2"
)
// Default errors for the error handler.
type RetryError struct {
Message string
}
func (e *RetryError) Error() string {
return fmt.Sprintf("RetryError: %s", e.Message)
}
type NonRetryError struct {
Message string
}
func (e *NonRetryError) Error() string {
return fmt.Sprintf("NonRetryError: %s", e.Message)
}
// ErrorHandler interface that returns an error. Required for any error handling and not depending on panics.
type ErrorHandler interface {
Handle(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error
}
type ErrorHandlerFunc func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error
func (f ErrorHandlerFunc) Handle(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
return f(ctx, settler, message)
}
func DefaultHandlers(
serviceBusReceiver sb.ReceiverInterface,
matcher *Matcher,
operationContainer oc.OperationContainerClient,
entityController EntityController,
logger *slog.Logger,
hooks []BaseOperationHooksInterface,
) shuttle.HandlerFunc {
// Lock renewal settings
lockRenewalInterval := 10 * time.Second
lockRenewalOptions := &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}
var errorHandler ErrorHandlerFunc
if operationContainer != nil {
errorHandler = NewOperationContainerHandler(
NewErrorReturnHandler(
OperationHandler(matcher, hooks, entityController),
serviceBusReceiver,
nil,
),
operationContainer,
)
} else {
errorHandler = NewErrorReturnHandler(
OperationHandler(matcher, hooks, entityController),
serviceBusReceiver,
nil,
)
}
// Combine handlers into a single default handler
return shuttle.NewPanicHandler(
nil,
shuttle.NewRenewLockHandler(
lockRenewalOptions,
NewLogHandler(
logger,
NewQosErrorHandler(
errorHandler,
),
),
),
)
}
// A QoS handler that is able to log the errors as well.
func NewQosErrorHandler(errHandler ErrorHandlerFunc) shuttle.HandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) {
logger := ctxlogger.GetLogger(ctx)
start := time.Now()
err := errHandler.Handle(ctx, settler, message)
t := time.Now()
elapsed := t.Sub(start)
logger.Info("QoS: Operation started at: " + start.String())
logger.Info("QoS: Operation processed at: " + t.String())
logger.Info("QoS: Operation took " + elapsed.String() + " to process.")
if err != nil {
logger.Error("QoS: Error ocurred in previousHandler: " + err.Error())
}
}
}
// NewQoSHandler creates a new QoS handler with the provided logger.
func NewQoSHandler(logger *slog.Logger, next shuttle.HandlerFunc) shuttle.HandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) {
if logger == nil {
logger = ctxlogger.GetLogger(ctx)
}
start := time.Now()
next(ctx, settler, message)
t := time.Now()
elapsed := t.Sub(start)
logger.Info("QoSHandler: Operation started at: " + start.String())
logger.Info("QoSHandler: Operation processed at: " + t.String())
logger.Info("QoSHandler: Operation took " + elapsed.String() + " to process.")
}
}
// An error handler that continues the normal shuttle.HandlerFunc handler chain.
func NewErrorHandler(errHandler ErrorHandlerFunc, receiver sb.ReceiverInterface, next shuttle.HandlerFunc) shuttle.HandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) {
err := errHandler.Handle(ctx, settler, message)
if err != nil {
logger := ctxlogger.GetLogger(ctx)
logger.Error("ErrorHandler: Handling error: " + err.Error())
switch err.(type) {
case *NonRetryError:
logger.Info("ErrorHandler: Handling NonRetryError.")
nonRetryOperationError(ctx, settler, message)
case *RetryError:
logger.Info("ErrorHandler: Handling RetryError.")
retryOperationError(receiver, ctx, settler, message)
default:
logger.Info("Error handled: " + err.Error())
}
}
if next != nil {
next(ctx, settler, message)
}
}
}
// An error handler that provides the error to the parent handler for logging.
func NewErrorReturnHandler(errHandler ErrorHandlerFunc, receiver sb.ReceiverInterface, next shuttle.HandlerFunc) ErrorHandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
err := errHandler.Handle(ctx, settler, message)
if err != nil {
logger := ctxlogger.GetLogger(ctx)
logger.Error("ErrorHandler: Handling error: " + err.Error())
switch err.(type) {
case *NonRetryError:
logger.Info("ErrorHandler: Handling NonRetryError.")
nonRetryOperationError(ctx, settler, message)
case *RetryError:
logger.Info("ErrorHandler: Handling RetryError.")
retryOperationError(receiver, ctx, settler, message)
default:
logger.Info("Error handled: " + err.Error())
}
}
if next != nil {
next(ctx, settler, message)
}
return err
}
}
// Handler for when the user uses the OperationContainer
func NewOperationContainerHandler(errHandler ErrorHandlerFunc, operationContainer oc.OperationContainerClient) ErrorHandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
logger := ctxlogger.GetLogger(ctx)
var body OperationRequest
err := json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("OperationContainerHandler: Error unmarshalling message: " + err.Error())
return nil
}
var updateOperationStatusRequest *oc.UpdateOperationStatusRequest
// If the operation is picked up immediately from the service bus, while the operationContainer is still putting the
// operation into the hcp and operations databases, this step might fail if both databases have not been updated.
// Allowing a couple of retries before fully failing the operation due to this error.
opInProgress := false
for i := 0; i < 5; i++ {
// err = operationContainer.OperationInProgress(ctx, body.OperationId)
updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
OperationId: body.OperationId,
Status: oc.Status_IN_PROGRESS,
}
_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
if err != nil {
logger.Error("OperationContainerHandler: Error setting operation in progress: " + err.Error())
logger.Info("Trying again.")
} else {
opInProgress = true
break
}
}
if !opInProgress {
logger.Error("Operation was not able to be put in progress.")
return err
}
err = errHandler.Handle(ctx, settler, message)
if err != nil {
logger.Info("OperationContainerHandler: Handling error: " + err.Error())
switch err.(type) {
case *NonRetryError:
// Cancel the operation
logger.Info("OperationContainerHandler: Setting operation as Cancelled.")
// err = operationContainerOperationCancel(ctx, body.OperationId)
updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
OperationId: body.OperationId,
Status: oc.Status_CANCELLED,
}
_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
if err != nil {
logger.Error("OperationContainerHandler: Something went wrong setting the operation as Cancelled" + err.Error())
return err
}
case *RetryError:
// Set the operation as Pending
logger.Info("OperationContainerHandler: Setting operation as Pending.")
updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
OperationId: body.OperationId,
Status: oc.Status_PENDING,
}
_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
if err != nil {
logger.Error("OperationContainerHandler: Something went wrong setting the operation as Pending:" + err.Error())
return err
}
default:
logger.Info("OperationContainerHandler: Error type not recognized. Operation status not changed.")
}
} else {
logger.Info("Setting Operation as Successful.")
updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
OperationId: body.OperationId,
Status: oc.Status_COMPLETED,
}
_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
if err != nil {
logger.Error("OperationContainerHandler: Something went wrong setting the operation as Completed: " + err.Error())
return err
}
}
return err
}
}
// NewLogHandler creates a new log handler with the provided logger.
func NewLogHandler(logger *slog.Logger, next shuttle.HandlerFunc) shuttle.HandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) {
if logger == nil {
logger = ctxlogger.GetLogger(ctx)
}
newCtx := ctxlogger.WithLogger(ctx, logger)
logger.Info("LogHandler: Delivery count: " + fmt.Sprint(message.DeliveryCount))
if message.CorrelationID != nil {
logger.Info("LogHandler: Corrolation Id: " + *message.CorrelationID)
}
var body OperationRequest
err := json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("LogHandler: Error unmarshalling message:" + err.Error())
}
logger.Info("LogHandler: OperationId: " + body.OperationId)
next(newCtx, settler, message)
}
}
func nonRetryOperationError(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Non Retry Operation Error.")
var body OperationRequest
err := json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("Error calling ReceiveOperation: " + err.Error())
return err
}
// Settle message
deadLetterMessage(ctx, settler, message, nil)
return nil
}
func retryOperationError(receiver sb.ReceiverInterface, ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Abandoning message for retry.")
azReceiver, err := receiver.GetAzureReceiver()
if err != nil {
return err
}
var body OperationRequest
err = json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("Error calling ReceiveOperation: " + err.Error())
return err
}
// Retry the message
err = azReceiver.AbandonMessage(ctx, message, nil)
if err != nil {
logger.Error("Error abandoning message: " + err.Error())
return err
}
return nil
}
func OperationHandler(matcher *Matcher, hooks []BaseOperationHooksInterface, entityController EntityController) ErrorHandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
logger := ctxlogger.GetLogger(ctx)
// 1. Unmarshall the operation
var body OperationRequest
err := json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("Error calling unmarshalling message body: " + err.Error())
return &NonRetryError{Message: "Error unmarshalling message."}
}
// 2 Match it with the correct type of operation
operation, err := matcher.CreateHookedInstace(body.OperationName, hooks)
if err != nil {
logger.Error("Operation type doesn't exist in the matcher: " + err.Error())
return &NonRetryError{Message: "Error creating operation instance."}
}
// 3. Init the operation with the information we have.
_, err = operation.InitOperation(ctx, body)
if err != nil {
logger.Error("Something went wrong initializing the operation.")
return &RetryError{Message: "Error setting operation In Progress"}
}
//TODO(mheberling): Remove this after chatting usage is adopted in Guardrails
var entity Entity
if entityController != nil {
entity, err = entityController.GetEntity(ctx, body)
if err != nil {
logger.Error("Something went wrong getting the entity.")
return &RetryError{Message: "Error getting operationEntity"}
}
}
// 4. Guard against concurrency.
ce := operation.GuardConcurrency(ctx, entity)
if err != nil {
logger.Error("Error calling GuardConcurrency: " + ce.Err.Error())
return &RetryError{Message: "Error guarding operation concurrency."}
}
// 5. Call run on the operation
err = operation.Run(ctx)
if err != nil {
logger.Error("Something went wrong running the operation: " + err.Error())
return &RetryError{Message: "Error running operation."}
}
// 6. Finish the message
settleMessage(ctx, settler, message, nil)
logger.Info("Operation run successfully!")
return nil
}
}
func settleMessage(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Settling message.")
err := settler.CompleteMessage(ctx, message, options)
if err != nil {
logger.Error("Unable to settle message.")
}
}
func deadLetterMessage(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) {
logger := ctxlogger.GetLogger(ctx)
logger.Info("DeadLettering message.")
err := settler.DeadLetterMessage(ctx, message, options)
if err != nil {
logger.Error("Unable to deadletter message.")
}
}