in operationsbus/handlers.go [334:390]
func OperationHandler(matcher *Matcher, hooks []BaseOperationHooksInterface, entityController EntityController) ErrorHandlerFunc {
return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
logger := ctxlogger.GetLogger(ctx)
// 1. Unmarshall the operation
var body OperationRequest
err := json.Unmarshal(message.Body, &body)
if err != nil {
logger.Error("Error calling unmarshalling message body: " + err.Error())
return &NonRetryError{Message: "Error unmarshalling message."}
}
// 2 Match it with the correct type of operation
operation, err := matcher.CreateHookedInstace(body.OperationName, hooks)
if err != nil {
logger.Error("Operation type doesn't exist in the matcher: " + err.Error())
return &NonRetryError{Message: "Error creating operation instance."}
}
// 3. Init the operation with the information we have.
_, err = operation.InitOperation(ctx, body)
if err != nil {
logger.Error("Something went wrong initializing the operation.")
return &RetryError{Message: "Error setting operation In Progress"}
}
//TODO(mheberling): Remove this after chatting usage is adopted in Guardrails
var entity Entity
if entityController != nil {
entity, err = entityController.GetEntity(ctx, body)
if err != nil {
logger.Error("Something went wrong getting the entity.")
return &RetryError{Message: "Error getting operationEntity"}
}
}
// 4. Guard against concurrency.
ce := operation.GuardConcurrency(ctx, entity)
if err != nil {
logger.Error("Error calling GuardConcurrency: " + ce.Err.Error())
return &RetryError{Message: "Error guarding operation concurrency."}
}
// 5. Call run on the operation
err = operation.Run(ctx)
if err != nil {
logger.Error("Something went wrong running the operation: " + err.Error())
return &RetryError{Message: "Error running operation."}
}
// 6. Finish the message
settleMessage(ctx, settler, message, nil)
logger.Info("Operation run successfully!")
return nil
}
}