func()

in pkg/csi_driver/node.go [189:284]


func (s *nodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
	volumeID := req.GetVolumeId()
	if len(volumeID) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
	}

	stagingTargetPath := req.GetStagingTargetPath()
	if len(stagingTargetPath) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Staging target path not provided")
	}

	targetPath := req.GetTargetPath()
	if len(targetPath) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Target path not provided")
	}

	volCap := req.GetVolumeCapability()
	if volCap == nil {
		return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
	}

	if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	if acquired := s.volumeLocks.TryAcquire(targetPath); !acquired {
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, targetPath)
	}
	defer s.volumeLocks.Release(targetPath)

	mountOptions := []string{"bind"}
	ro := req.GetReadonly()
	if ro {
		mountOptions = append(mountOptions, "ro")
	}

	var fsGroup string
	if m := volCap.GetMount(); m != nil {
		for _, f := range m.GetMountFlags() {
			if !hasOption(mountOptions, f) {
				mountOptions = append(mountOptions, f)
			}
		}

		if m.GetVolumeMountGroup() != "" {
			fsGroup = m.GetVolumeMountGroup()
		}
	}

	nodeName := s.driver.config.NodeID

	mounted, err := s.isMounted(targetPath)
	if err != nil {
		return nil, err
	}

	if mounted {
		if err := setVolumeOwnershipTopLevel(volumeID, targetPath, fsGroup, ro); err != nil {
			klog.Infof("setVolumeOwnershipTopLevel failed for volume %q, path %q, fsGroup %q, cleaning up mount point on node %s", volumeID, targetPath, fsGroup, nodeName)
			if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
				klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
			}

			return nil, status.Error(codes.Internal, err.Error())
		}

		return &csi.NodePublishVolumeResponse{}, nil
	}

	klog.V(5).Infof("NodePublishVolume creating dir %s on node %s", targetPath, nodeName)
	if err := makeDir(targetPath); err != nil {
		return nil, status.Errorf(codes.Internal, "Could not create dir %q on node %s: %v", targetPath, nodeName, err)
	}

	if err := s.mounter.MountSensitiveWithoutSystemd(stagingTargetPath, targetPath, "lustre", mountOptions, nil); err != nil {
		klog.Errorf("Mount %q failed on node %s, cleaning up", targetPath, nodeName)
		if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
			klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
		}

		return nil, status.Errorf(codes.Internal, "mount %q failed on node %s: %v", targetPath, nodeName, err.Error())
	}

	klog.V(4).Infof("NodePublishVolume successfully mounted %s on node %s", targetPath, nodeName)

	if err := setVolumeOwnershipTopLevel(volumeID, targetPath, fsGroup, ro); err != nil {
		klog.Infof("setVolumeOwnershipTopLevel failed for volume %q, path %q, fsGroup %q, cleaning up mount point on node %s", volumeID, targetPath, fsGroup, nodeName)
		if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
			klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
		}

		return nil, status.Error(codes.Internal, err.Error())
	}

	return &csi.NodePublishVolumeResponse{}, nil
}