func()

in pkg/csi_driver/node.go [74:162]


func (s *nodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
	volumeID := req.GetVolumeId()
	if len(volumeID) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
	}

	context := req.GetVolumeContext()
	ip := context[volumeContextIP]
	fsname := context[volumeContextFSName]

	if len(ip) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Lustre instance IP is not provided")
	}

	if len(fsname) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Lustre filesystem name is not provided")
	}

	source := fmt.Sprintf("%s@tcp:/%s", ip, fsname)

	target := req.GetStagingTargetPath()
	if len(target) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Target path not provided")
	}

	volCap := req.GetVolumeCapability()
	if volCap == nil {
		return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
	}

	if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	if acquired := s.volumeLocks.TryAcquire(target); !acquired {
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, target)
	}
	defer s.volumeLocks.Release(target)

	mountOptions := []string{}

	if m := volCap.GetMount(); m != nil {
		for _, f := range m.GetMountFlags() {
			if !hasOption(mountOptions, f) {
				mountOptions = append(mountOptions, f)
			}
		}
	}

	nodeName := s.driver.config.NodeID
	// Checking if the target directory is already mounted with a volume.
	mounted, err := s.isMounted(target)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "Could not check if %q is mounted on node %s: %v", target, nodeName, err)
	}

	if mounted {
		klog.V(4).Infof("NodeStageVolume successfully mounted device %v to path %s on node %s, mount already exists.", volumeID, target, nodeName)

		return &csi.NodeStageVolumeResponse{}, nil
	}

	hasFSName, err := s.hasMountWithSameFSName(fsname)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "Could not check if there is an existing mountpoint with the same Lustre filesystem name %s on node %s: %v", fsname, nodeName, err)
	}
	if hasFSName {
		return nil, status.Errorf(codes.AlreadyExists, "A mountpoint with the same lustre filesystem name %q already exists on node %s. Please mount different lustre filesystems", fsname, nodeName)
	}

	klog.V(5).Infof("NodeStageVolume creating dir %s on node %s", target, nodeName)
	if err := makeDir(target); err != nil {
		return nil, status.Errorf(codes.Internal, "Could not create dir %s on node %s: %v", target, nodeName, err)
	}

	klog.V(4).Infof("NodeStageVolume mounting volume %s to path %s on node %s with mountOptions %v", volumeID, target, nodeName, mountOptions)
	if err := s.mounter.MountSensitiveWithoutSystemd(source, target, "lustre", mountOptions, nil); err != nil {
		klog.Errorf("Mount %q failed on node %s, cleaning up", target, nodeName)
		if unmntErr := mount.CleanupMountPoint(target, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
			klog.Errorf("Unmount %q failed on node %s: %v", target, nodeName, unmntErr.Error())
		}

		return nil, status.Errorf(codes.Internal, "Could not mount %q at %q on node %s: %v", source, target, nodeName, err)
	}

	klog.V(4).Infof("NodeStageVolume successfully mounted volume %v to path %s on node %s", volumeID, target, nodeName)

	return &csi.NodeStageVolumeResponse{}, nil
}