in pkg/csi/cinder/controllerserver.go [48:153]
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(4).Infof("CreateVolume: called with args %+v", protosanitizer.StripSecrets(*req))
// Volume Name
volName := req.GetName()
volCapabilities := req.GetVolumeCapabilities()
if len(volName) == 0 {
return nil, status.Error(codes.InvalidArgument, "[CreateVolume] missing Volume Name")
}
if volCapabilities == nil {
return nil, status.Error(codes.InvalidArgument, "[CreateVolume] missing Volume capability")
}
// Volume Size - Default is 1 GiB
volSizeBytes := int64(1 * 1024 * 1024 * 1024)
if req.GetCapacityRange() != nil {
volSizeBytes = int64(req.GetCapacityRange().GetRequiredBytes())
}
volSizeGB := int(util.RoundUpSize(volSizeBytes, 1024*1024*1024))
// Volume Type
volType := req.GetParameters()["type"]
var volAvailability string
// First check if volAvailability is already specified, if not get preferred from Topology
// Required, incase vol AZ is different from node AZ
volAvailability = req.GetParameters()["availability"]
if len(volAvailability) == 0 {
// Check from Topology
if req.GetAccessibilityRequirements() != nil {
volAvailability = getAZFromTopology(req.GetAccessibilityRequirements())
}
}
cloud := cs.Cloud
ignoreVolumeAZ := cloud.GetBlockStorageOpts().IgnoreVolumeAZ
// Verify a volume with the provided name doesn't already exist for this tenant
volumes, err := cloud.GetVolumesByName(volName)
if err != nil {
klog.Errorf("Failed to query for existing Volume during CreateVolume: %v", err)
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to get volumes: %s", err))
}
if len(volumes) == 1 {
if volSizeGB != volumes[0].Size {
return nil, status.Error(codes.AlreadyExists, "Volume Already exists with same name and different capacity")
}
klog.V(4).Infof("Volume %s already exists in Availability Zone: %s of size %d GiB", volumes[0].ID, volumes[0].AvailabilityZone, volumes[0].Size)
return getCreateVolumeResponse(&volumes[0], ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
} else if len(volumes) > 1 {
klog.V(3).Infof("found multiple existing volumes with selected name (%s) during create", volName)
return nil, status.Error(codes.Internal, "Multiple volumes reported by Cinder with same name")
}
// Volume Create
properties := map[string]string{cinderCSIClusterIDKey: cs.Driver.cluster}
//Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
for _, mKey := range []string{"csi.storage.k8s.io/pvc/name", "csi.storage.k8s.io/pvc/namespace", "csi.storage.k8s.io/pv/name"} {
if v, ok := req.Parameters[mKey]; ok {
properties[mKey] = v
}
}
content := req.GetVolumeContentSource()
var snapshotID string
var sourcevolID string
if content != nil && content.GetSnapshot() != nil {
snapshotID = content.GetSnapshot().GetSnapshotId()
_, err := cloud.GetSnapshotByID(snapshotID)
if err != nil {
if cpoerrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "VolumeContentSource Snapshot %s not found", snapshotID)
}
return nil, status.Errorf(codes.Internal, "Failed to retrieve the snapshot %s: %v", snapshotID, err)
}
}
if content != nil && content.GetVolume() != nil {
sourcevolID = content.GetVolume().GetVolumeId()
_, err := cloud.GetVolume(sourcevolID)
if err != nil {
if cpoerrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "Source Volume %s not found", sourcevolID)
}
return nil, status.Errorf(codes.Internal, "Failed to retrieve the source volume %s: %v", sourcevolID, err)
}
}
vol, err := cloud.CreateVolume(volName, volSizeGB, volType, volAvailability, snapshotID, sourcevolID, &properties)
if err != nil {
klog.Errorf("Failed to CreateVolume: %v", err)
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume failed with error %v", err))
}
klog.V(4).Infof("CreateVolume: Successfully created volume %s in Availability Zone: %s of size %d GiB", vol.ID, vol.AvailabilityZone, vol.Size)
return getCreateVolumeResponse(vol, ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
}