func()

in pkg/csi_driver/controller.go [118:189]


func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
	// Validate arguments
	name := req.GetName()
	if len(name) == 0 {
		return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
	}
	volumeID := strings.ToLower(name)

	if err := s.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	capBytes, err := getRequestCapacity(req.GetCapacityRange())
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	secrets := req.GetSecrets()
	projectID, ok := secrets["projectID"]
	if !ok {
		return nil, status.Error(codes.InvalidArgument, "projectID must be provided in secret")
	}

	if acquired := s.volumeLocks.TryAcquire(volumeID); !acquired {
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
	}
	defer s.volumeLocks.Release(volumeID)

	param := req.GetParameters()
	newBucket := &storage.ServiceBucket{
		Project:                        projectID,
		Name:                           volumeID,
		SizeBytes:                      capBytes,
		EnableUniformBucketLevelAccess: true,
	}

	storageService, err := s.prepareStorageService(ctx, secrets)
	if err != nil {
		return nil, status.Errorf(codes.Unauthenticated, "failed to prepare storage service: %v", err)
	}
	defer storageService.Close()

	// Check if the bucket already exists
	bucket, err := storageService.GetBucket(ctx, newBucket)
	if err != nil && !storage.IsNotExistErr(err) {
		return nil, status.Error(codes.Internal, err.Error())
	}
	if bucket != nil {
		klog.V(4).Infof("Found existing bucket %+v, current bucket %+v\n", bucket, newBucket)
		// Bucket already exists, check if it meets the request
		if err = storage.CompareBuckets(newBucket, bucket); err != nil {
			return nil, status.Error(codes.AlreadyExists, err.Error())
		}
	} else {
		// Add labels
		labels, err := extractLabels(param, s.driver.config.Name)
		if err != nil {
			return nil, status.Error(codes.InvalidArgument, err.Error())
		}
		newBucket.Labels = labels

		// Create the bucket
		var createErr error
		bucket, createErr = storageService.CreateBucket(ctx, newBucket)
		if createErr != nil {
			return nil, status.Error(codes.Internal, createErr.Error())
		}
	}
	resp := &csi.CreateVolumeResponse{Volume: bucketToCSIVolume(bucket)}

	return resp, nil
}