in pkg/csi_driver/controller.go [118:189]
func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
// Validate arguments
name := req.GetName()
if len(name) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
volumeID := strings.ToLower(name)
if err := s.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
capBytes, err := getRequestCapacity(req.GetCapacityRange())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
secrets := req.GetSecrets()
projectID, ok := secrets["projectID"]
if !ok {
return nil, status.Error(codes.InvalidArgument, "projectID must be provided in secret")
}
if acquired := s.volumeLocks.TryAcquire(volumeID); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer s.volumeLocks.Release(volumeID)
param := req.GetParameters()
newBucket := &storage.ServiceBucket{
Project: projectID,
Name: volumeID,
SizeBytes: capBytes,
EnableUniformBucketLevelAccess: true,
}
storageService, err := s.prepareStorageService(ctx, secrets)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "failed to prepare storage service: %v", err)
}
defer storageService.Close()
// Check if the bucket already exists
bucket, err := storageService.GetBucket(ctx, newBucket)
if err != nil && !storage.IsNotExistErr(err) {
return nil, status.Error(codes.Internal, err.Error())
}
if bucket != nil {
klog.V(4).Infof("Found existing bucket %+v, current bucket %+v\n", bucket, newBucket)
// Bucket already exists, check if it meets the request
if err = storage.CompareBuckets(newBucket, bucket); err != nil {
return nil, status.Error(codes.AlreadyExists, err.Error())
}
} else {
// Add labels
labels, err := extractLabels(param, s.driver.config.Name)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
newBucket.Labels = labels
// Create the bucket
var createErr error
bucket, createErr = storageService.CreateBucket(ctx, newBucket)
if createErr != nil {
return nil, status.Error(codes.Internal, createErr.Error())
}
}
resp := &csi.CreateVolumeResponse{Volume: bucketToCSIVolume(bucket)}
return resp, nil
}