in pkg/csi/manila/controllerserver.go [76:178]
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := validateCreateVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Configuration
params := req.GetParameters()
if params == nil {
params = make(map[string]string)
}
params["protocol"] = cs.d.shareProto
shareOpts, err := options.NewControllerVolumeContext(params)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid volume parameters: %v", err)
}
shareMetadata, err := prepareShareMetadata(shareOpts.AppendShareMetadata, cs.d.clusterID)
if err != nil {
return nil, err
}
osOpts, err := options.NewOpenstackOptions(req.GetSecrets())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid OpenStack secrets: %v", err)
}
// Check for pending CreateVolume for this volume name
if _, isPending := pendingVolumes.LoadOrStore(req.GetName(), true); isPending {
return nil, status.Errorf(codes.Aborted, "volume %s is already being processed", req.GetName())
}
defer pendingVolumes.Delete(req.GetName())
manilaClient, err := cs.d.manilaClientBuilder.New(osOpts)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "failed to create Manila v2 client: %v", err)
}
shareTypeCaps, err := capabilities.GetManilaCapabilities(shareOpts.Type, manilaClient)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get Manila capabilities for share type %s: %v", shareOpts.Type, err)
}
requestedSize := req.GetCapacityRange().GetRequiredBytes()
if requestedSize == 0 {
// At least 1GiB
requestedSize = 1 * bytesInGiB
}
sizeInGiB := bytesToGiB(requestedSize)
// Retrieve an existing share or create a new one
volCreator, err := getVolumeCreator(req.GetVolumeContentSource(), shareOpts, cs.d.compatOpts)
if err != nil {
return nil, err
}
share, err := volCreator.create(req, req.GetName(), sizeInGiB, manilaClient, shareOpts, shareMetadata)
if err != nil {
return nil, err
}
if err = verifyVolumeCompatibility(sizeInGiB, req, share, shareOpts, cs.d.compatOpts, shareTypeCaps); err != nil {
return nil, status.Errorf(codes.AlreadyExists, "volume %s already exists, but is incompatible with the request: %v", req.GetName(), err)
}
// Grant access to the share
ad := getShareAdapter(shareOpts.Protocol)
accessRight, err := ad.GetOrGrantAccess(&shareadapters.GrantAccessArgs{Share: share, ManilaClient: manilaClient, Options: shareOpts})
if err != nil {
if err == wait.ErrWaitTimeout {
return nil, status.Errorf(codes.DeadlineExceeded, "deadline exceeded while waiting for access rule %s for volume %s to become available", accessRight.ID, share.Name)
}
return nil, status.Errorf(codes.Internal, "failed to grant access to volume %s: %v", share.Name, err)
}
var accessibleTopology []*csi.Topology
if cs.d.withTopology {
// All requisite/preferred topologies are considered valid. Nodes from those zones are required to be able to reach the storage.
// The operator is responsible for making sure that provided topology keys are valid and present on the nodes of the cluster.
accessibleTopology = req.GetAccessibilityRequirements().GetPreferred()
}
volCtx := filterParametersForVolumeContext(params, options.NodeVolumeContextFields())
volCtx["shareID"] = share.ID
volCtx["shareAccessID"] = accessRight.ID
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: share.ID,
ContentSource: req.GetVolumeContentSource(),
AccessibleTopology: accessibleTopology,
CapacityBytes: int64(sizeInGiB) * bytesInGiB,
VolumeContext: volCtx,
},
}, nil
}