in pkg/csi_driver/node.go [74:162]
func (s *nodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
context := req.GetVolumeContext()
ip := context[volumeContextIP]
fsname := context[volumeContextFSName]
if len(ip) == 0 {
return nil, status.Error(codes.InvalidArgument, "Lustre instance IP is not provided")
}
if len(fsname) == 0 {
return nil, status.Error(codes.InvalidArgument, "Lustre filesystem name is not provided")
}
source := fmt.Sprintf("%s@tcp:/%s", ip, fsname)
target := req.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if acquired := s.volumeLocks.TryAcquire(target); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, target)
}
defer s.volumeLocks.Release(target)
mountOptions := []string{}
if m := volCap.GetMount(); m != nil {
for _, f := range m.GetMountFlags() {
if !hasOption(mountOptions, f) {
mountOptions = append(mountOptions, f)
}
}
}
nodeName := s.driver.config.NodeID
// Checking if the target directory is already mounted with a volume.
mounted, err := s.isMounted(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not check if %q is mounted on node %s: %v", target, nodeName, err)
}
if mounted {
klog.V(4).Infof("NodeStageVolume successfully mounted device %v to path %s on node %s, mount already exists.", volumeID, target, nodeName)
return &csi.NodeStageVolumeResponse{}, nil
}
hasFSName, err := s.hasMountWithSameFSName(fsname)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not check if there is an existing mountpoint with the same Lustre filesystem name %s on node %s: %v", fsname, nodeName, err)
}
if hasFSName {
return nil, status.Errorf(codes.AlreadyExists, "A mountpoint with the same lustre filesystem name %q already exists on node %s. Please mount different lustre filesystems", fsname, nodeName)
}
klog.V(5).Infof("NodeStageVolume creating dir %s on node %s", target, nodeName)
if err := makeDir(target); err != nil {
return nil, status.Errorf(codes.Internal, "Could not create dir %s on node %s: %v", target, nodeName, err)
}
klog.V(4).Infof("NodeStageVolume mounting volume %s to path %s on node %s with mountOptions %v", volumeID, target, nodeName, mountOptions)
if err := s.mounter.MountSensitiveWithoutSystemd(source, target, "lustre", mountOptions, nil); err != nil {
klog.Errorf("Mount %q failed on node %s, cleaning up", target, nodeName)
if unmntErr := mount.CleanupMountPoint(target, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
klog.Errorf("Unmount %q failed on node %s: %v", target, nodeName, unmntErr.Error())
}
return nil, status.Errorf(codes.Internal, "Could not mount %q at %q on node %s: %v", source, target, nodeName, err)
}
klog.V(4).Infof("NodeStageVolume successfully mounted volume %v to path %s on node %s", volumeID, target, nodeName)
return &csi.NodeStageVolumeResponse{}, nil
}