func()

in pkg/csi_driver/controller.go [132:220]


func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
	// Validate arguments
	volumeName := strings.ToLower(req.GetName())
	if len(volumeName) == 0 {
		return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
	}

	if err := s.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	capBytes, err := getRequestCapacity(req.GetCapacityRange())
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	newInstance, err := s.prepareNewInstance(volumeName, capBytes, req.GetParameters(), req.GetAccessibilityRequirements())
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	if acquired := s.volumeLocks.TryAcquire(volumeName); !acquired {
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeName)
	}
	defer s.volumeLocks.Release(volumeName)

	instance, err := s.cloudProvider.LustreService.GetInstance(ctx, newInstance)
	if err != nil && !lustre.IsNotFoundErr(err) {
		return nil, lustre.StatusError(err)
	}
	if instance != nil {
		klog.V(4).Infof("Found existing instance %+v, current instance %+v", instance, newInstance)
		// Instance already exists, check if it meets the request.
		if err := lustre.CompareInstances(newInstance, instance); err != nil {
			return nil, status.Error(codes.AlreadyExists, err.Error())
		}
		// Check if the Lustre instance is in the process of getting created.
		if instance.State == lustrepb.Instance_CREATING.String() {
			msg := fmt.Sprintf("Volume %v not ready, current state: %v", volumeName, instance.State)
			klog.V(4).Info(msg)

			return nil, status.Error(codes.DeadlineExceeded, msg)
		}
		if instance.State != lustrepb.Instance_ACTIVE.String() {
			msg := fmt.Sprintf("Volume %v not ready, current state: %v", volumeName, instance.State)
			klog.V(4).Info(msg)

			return nil, status.Error(codes.Unavailable, msg)
		}
	} else {
		// In the event of a stockout, the instance will be destroyed if the CreateInstance operation fails.
		// We should query the operation to retrieve the error and prevent the CSI driver from attempting to call CreateInstance again.
		op, err := s.cloudProvider.LustreService.GetCreateInstanceOp(ctx, newInstance)
		if err != nil {
			return nil, lustre.StatusError(err)
		}
		if op != nil && op.GetError() != nil {
			return nil, status.Error(codes.Code(op.GetError().GetCode()), op.GetError().GetMessage())
		}
		// Add labels.
		param := req.GetParameters()
		labels, err := extractLabels(param, s.driver.config.Name)
		if err != nil {
			return nil, status.Error(codes.InvalidArgument, err.Error())
		}
		newInstance.Labels = labels

		// The filesystem name for the lustre instance will be a randomly generated 8-character alphanumeric identifier to ensure uniqueness.
		// The CSI driver will validate this identifier by checking for duplicates against existing filesystem names within the target region.
		// If a conflict is found, a new identifier will be generated until uniqueness is confirmed.
		fsname, err := s.generateUniqueFilesystemName(ctx, newInstance)
		if err != nil {
			return nil, lustre.StatusError(err)
		}
		newInstance.Filesystem = fsname

		instance, err = s.cloudProvider.LustreService.CreateInstance(ctx, newInstance)
		if err != nil {
			return nil, lustre.StatusError(err)
		}
	}

	resp := &csi.CreateVolumeResponse{
		Volume: instanceToCSIVolume(instance),
	}
	klog.Infof("CreateVolume succeeded: %+v", resp)

	return resp, nil
}