pkg/operation/StartAsyncOperation.go (97 lines of code) (raw):

package operation import ( "context" "database/sql" "encoding/json" "fmt" "strings" database "github.com/Azure/aks-async/database" opbus "github.com/Azure/aks-async/operationsbus" "github.com/Azure/aks-async/servicebus" "github.com/Azure/aks-middleware/grpc/server/ctxlogger" "github.com/gofrs/uuid" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" oc "github.com/Azure/OperationContainer/api/v1" "google.golang.org/protobuf/types/known/timestamppb" ) func StartLongRunningOperation(ctx context.Context, operationName string, apiVersion string, body []byte, httpMethod string, retryCount int, entityId string, entityType string, expirationTimestamp *timestamppb.Timestamp, entityTableName string, serviceBusSender servicebus.SenderInterface, operationContainerClient oc.OperationContainerClient, dbClient *sql.DB) (string, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Starting async operation.") operationId, err := uuid.NewV4() if err != nil { logger.Error("Failed to generate UUID: " + err.Error()) return "", status.Error(codes.Internal, err.Error()) } operation := &opbus.OperationRequest{ OperationName: operationName, APIVersion: apiVersion, OperationId: operationId.String(), Body: body, HttpMethod: httpMethod, RetryCount: retryCount, EntityId: entityId, EntityType: entityType, ExpirationTimestamp: expirationTimestamp, } // We add operations in the following order: // 1. Put the operationRequest in the service bus. // 2. Add the operation to the operationContainer. // 3. Update the entity store with the desired state introduced by this operation. // We follow this process because if the service bus step fails, then we can simply return an error. However, // if we first update the entity store with the desired state (which succeed), and then sending the operationRequest // to the service bus fails, the operation would never be updated to a terminated state since there's // no processor to process it. The ordering avoids this issue. //TODO(mheberling): change to use the marshaler passed in. marshalledOperation, err := json.Marshal(operation) if err != nil { logger.Error("Error marshalling operation with operationId" + operationId.String() + ": " + err.Error()) return "", status.Error(codes.Internal, err.Error()) } logger.Info("Sending message to Service Bus") err = serviceBusSender.SendMessage(ctx, marshalledOperation) if err != nil { logger.Error("Error sending message to service bus with operationId" + operationId.String() + ": " + err.Error()) return "", status.Error(codes.Internal, err.Error()) } createOperationStatusRequest := &oc.CreateOperationStatusRequest{ OperationName: operationName, EntityId: entityId, ExpirationTimestamp: expirationTimestamp, OperationId: operationId.String(), } // Add the operation to the operations db. logger.Info("Creating Operation in OperationContainer.") _, err = operationContainerClient.CreateOperationStatus(ctx, createOperationStatusRequest) if err != nil { logger.Error("Error creating operation status with operationId" + operationId.String() + ": " + err.Error()) return "", status.Error(codes.Internal, err.Error()) } // This query checks that the operationId doesn't already exists and inserts it into the table if successful all in a single query to avoid race conditions. // If this step fails, the processor will simply retry to process the operation until the "Max Delivery Count" is reached, at which point the message // will be sent to the DeadLetterQueue, to be set as Failed. query := fmt.Sprintf(` MERGE INTO %s AS target USING (SELECT @p1 AS entity_id, @p2 AS entity_type, @p3 AS last_operation_id, @p4 AS operation_name, @p5 AS operation_status) AS source ON target.entity_id = source.entity_id and target.entity_type = source.entity_type WHEN MATCHED AND (target.operation_status = '%s' OR target.operation_status = '%s' OR target.operation_status = '%s' OR target.operation_status = '%s') THEN UPDATE SET target.last_operation_id = source.last_operation_id, target.operation_name = source.operation_name, target.operation_status = source.operation_status WHEN NOT MATCHED THEN INSERT (entity_id, entity_type, last_operation_id, operation_name, operation_status) VALUES (source.entity_id, source.entity_type, source.last_operation_id, source.operation_name, source.operation_status); `, entityTableName, oc.Status_COMPLETED.String(), oc.Status_FAILED.String(), oc.Status_CANCELLED.String(), oc.Status_UNKNOWN.String()) initialOperationStatus := oc.Status_PENDING.String() _, err = database.ExecDb(ctx, dbClient, query, entityId, entityType, operationId.String(), operationName, initialOperationStatus) // If no rows were affected, the ExecDb function will throw an error saying `No rows were affected!`. With this error we could // conclude that the query run successfully but didn't insert the record due to another operation in a non-terminal state running // with the same entity (matching entity_type and entity_id). if err != nil { logger.Error("Error in entity update query: " + err.Error() + ". Query options were: OperationId: " + operationId.String() + ", EntityId: " + entityId + ", EntityType: " + entityType + ", OperationName: " + operationName) logger.Info("Updating operation with id " + operationId.String() + " to cancelled in operations database.") cancelledOperationStatus := oc.Status_CANCELLED updateOperationStatusRequest := &oc.UpdateOperationStatusRequest{ OperationId: operationId.String(), Status: cancelledOperationStatus, } _, updateErr := operationContainerClient.UpdateOperationStatus(ctx, updateOperationStatusRequest) if updateErr != nil { logger.Error("Error updating operation status with operationId " + operationId.String() + ": " + updateErr.Error()) joinedErr := fmt.Errorf("Updating entity database: %w. Updating operation to Cancelled: %w.", err, updateErr) return "", status.Error(codes.Internal, joinedErr.Error()) } //TODO(mheberling): Change this to return a known type of error in aks-async/database, instead of errors.New(...) if strings.Contains(err.Error(), "No rows were affected!") { logger.Error("The combination of entityId " + entityId + " and entityType " + entityType + " was found in a non finalized state. Entity was not updated with last_operation_id " + operationId.String()) return "", status.Error(codes.AlreadyExists, err.Error()) } else { return "", status.Error(codes.Internal, err.Error()) } } return operationId.String(), nil }