in server/internal/server/StartLongRunningOperation.go [21:113]
func (s *Server) StartLongRunningOperation(ctx context.Context, in *pb.StartLongRunningOperationRequest) (*pb.StartLongRunningOperationResponse, error) {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Starting async operation.")
operationId, err := uuid.NewV4()
if err != nil {
logger.Error("Failed to generate UUID: " + err.Error())
return nil, err
}
operation := &opbus.OperationRequest{
OperationName: operations.LroName,
APIVersion: "",
OperationId: operationId.String(),
Body: nil,
HttpMethod: "",
RetryCount: 0,
EntityId: in.GetEntityId(),
EntityType: in.GetEntityType(),
ExpirationTimestamp: in.GetExpirationTimestamp(),
}
// We add operations in the following order:
// 1. Put the operationRequest in the service bus.
// 2. Update the entity store with the desired state introduced by this operation.
// 3. Add the operation to the operationContainer.
// We follow this process because if the service bus step fails, then we can simply return an error. However,
// if we first update the entity store with the desired state (which succeed), and then sending the operationRequest
// to the service bus fails, the operation would never be updated to a terminated state since there's
// no processor to process it. The ordering avoids this issue.
marshalledOperation, err := json.Marshal(operation)
if err != nil {
logger.Error("Error marshalling operation: " + err.Error())
return nil, err
}
logger.Info("Sending message to Service Bus")
err = s.serviceBusSender.SendMessage(ctx, marshalledOperation)
if err != nil {
logger.Error("Error sending message to service bus: " + err.Error())
return nil, err
}
// This query checks that the operationId doesn't already exists and inserts it into the table if successful all in a single query to avoid race conditions.
// If this step fails, the processor will simply retry to process the operation until the "Max Delivery Count" is reached, at which point the message
// will be sent to the DeadLetterQueue, to be set as Failed.
query := fmt.Sprintf(`
MERGE INTO %s AS target
USING (SELECT @p1 AS entity_id_value, @p2 AS entity_type_value, @p3 AS last_operation_id_value, @p4 AS operation_name_value, @p5 AS operation_status_value) AS source
ON target.entity_id = source.entity_id_value and target.entity_type = source.entity_type_value
WHEN MATCHED AND (target.operation_status = '%s' OR target.operation_status = '%s' OR target.operation_status = '%s' OR target.operation_status = '%s') THEN
UPDATE SET
target.last_operation_id = source.last_operation_id_value,
target.operation_name = source.operation_name_value,
target.operation_status = source.operation_status_value
WHEN NOT MATCHED THEN
INSERT (entity_id, entity_type, last_operation_id, operation_name, operation_status)
VALUES (source.entity_id_value, source.entity_type_value, source.last_operation_id_value, source.operation_name_value, source.operation_status_value);
`, s.entityTableName, oc.Status_COMPLETED.String(), oc.Status_FAILED.String(), oc.Status_CANCELLED.String(), oc.Status_UNKNOWN.String())
initialOperationStatus := oc.Status_PENDING.String()
_, err = database.ExecDb(ctx, s.dbClient, query, in.GetEntityId(), in.GetEntityType(), operationId.String(), operations.LroName, initialOperationStatus)
// If no rows were affected, the ExecDb function will throw an error saying `No rows were affected!`. With this error we could
// conclude that the query run successfully but didn't insert the record due to another record already existing with the same
// operation_id.
if err != nil {
logger.Error("Error in operations query: " + err.Error())
//TODO(mheberling): Change this to return a known type of error in aks-async/database, instead of errors.New(...)
if strings.Index(err.Error(), "No rows were affected!") == -1 {
logger.Error("The combination of entityId " + in.GetEntityId() + " and entityType " + in.GetEntityType() + "was found in a non finalized state. Entity was not updated.")
return nil, status.Error(codes.AlreadyExists, err.Error())
} else {
return nil, status.Error(codes.Internal, err.Error())
}
}
createOperationStatusRequest := &oc.CreateOperationStatusRequest{
OperationName: operations.LroName,
EntityId: in.GetEntityId(),
ExpirationTimestamp: in.GetExpirationTimestamp(),
OperationId: operationId.String(),
}
// Add the operation to the db.
logger.Info("Adding operation to db.")
_, err = s.operationContainerClient.CreateOperationStatus(ctx, createOperationStatusRequest)
if err != nil {
logger.Error("Error creating operation status: " + err.Error())
return nil, err
}
startOperationResponse := &pb.StartLongRunningOperationResponse{OperationId: operationId.String()}
return startOperationResponse, nil
}