in pkg/csi_driver/controller.go [132:220]
func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
// Validate arguments
volumeName := strings.ToLower(req.GetName())
if len(volumeName) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
if err := s.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
capBytes, err := getRequestCapacity(req.GetCapacityRange())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
newInstance, err := s.prepareNewInstance(volumeName, capBytes, req.GetParameters(), req.GetAccessibilityRequirements())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if acquired := s.volumeLocks.TryAcquire(volumeName); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeName)
}
defer s.volumeLocks.Release(volumeName)
instance, err := s.cloudProvider.LustreService.GetInstance(ctx, newInstance)
if err != nil && !lustre.IsNotFoundErr(err) {
return nil, lustre.StatusError(err)
}
if instance != nil {
klog.V(4).Infof("Found existing instance %+v, current instance %+v", instance, newInstance)
// Instance already exists, check if it meets the request.
if err := lustre.CompareInstances(newInstance, instance); err != nil {
return nil, status.Error(codes.AlreadyExists, err.Error())
}
// Check if the Lustre instance is in the process of getting created.
if instance.State == lustrepb.Instance_CREATING.String() {
msg := fmt.Sprintf("Volume %v not ready, current state: %v", volumeName, instance.State)
klog.V(4).Info(msg)
return nil, status.Error(codes.DeadlineExceeded, msg)
}
if instance.State != lustrepb.Instance_ACTIVE.String() {
msg := fmt.Sprintf("Volume %v not ready, current state: %v", volumeName, instance.State)
klog.V(4).Info(msg)
return nil, status.Error(codes.Unavailable, msg)
}
} else {
// In the event of a stockout, the instance will be destroyed if the CreateInstance operation fails.
// We should query the operation to retrieve the error and prevent the CSI driver from attempting to call CreateInstance again.
op, err := s.cloudProvider.LustreService.GetCreateInstanceOp(ctx, newInstance)
if err != nil {
return nil, lustre.StatusError(err)
}
if op != nil && op.GetError() != nil {
return nil, status.Error(codes.Code(op.GetError().GetCode()), op.GetError().GetMessage())
}
// Add labels.
param := req.GetParameters()
labels, err := extractLabels(param, s.driver.config.Name)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
newInstance.Labels = labels
// The filesystem name for the lustre instance will be a randomly generated 8-character alphanumeric identifier to ensure uniqueness.
// The CSI driver will validate this identifier by checking for duplicates against existing filesystem names within the target region.
// If a conflict is found, a new identifier will be generated until uniqueness is confirmed.
fsname, err := s.generateUniqueFilesystemName(ctx, newInstance)
if err != nil {
return nil, lustre.StatusError(err)
}
newInstance.Filesystem = fsname
instance, err = s.cloudProvider.LustreService.CreateInstance(ctx, newInstance)
if err != nil {
return nil, lustre.StatusError(err)
}
}
resp := &csi.CreateVolumeResponse{
Volume: instanceToCSIVolume(instance),
}
klog.Infof("CreateVolume succeeded: %+v", resp)
return resp, nil
}