func()

in pkg/nfs/nodeserver.go [55:184]


func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
	volCap := req.GetVolumeCapability()
	if volCap == nil {
		return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
	}
	volumeID := req.GetVolumeId()
	if len(volumeID) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
	}
	targetPath := req.GetTargetPath()
	if len(targetPath) == 0 {
		return nil, status.Error(codes.InvalidArgument, "Target path not provided")
	}

	lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
	if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
		return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
	}
	defer ns.Driver.volumeLocks.Release(lockKey)

	mountOptions := volCap.GetMount().GetMountFlags()
	if req.GetReadonly() {
		mountOptions = append(mountOptions, "ro")
	}

	var server, baseDir, subDir string
	subDirReplaceMap := map[string]string{}

	mountPermissions := ns.Driver.mountPermissions
	for k, v := range req.GetVolumeContext() {
		switch strings.ToLower(k) {
		case paramServer:
			server = v
		case paramShare:
			baseDir = v
		case paramSubDir:
			subDir = v
		case pvcNamespaceKey:
			subDirReplaceMap[pvcNamespaceMetadata] = v
		case pvcNameKey:
			subDirReplaceMap[pvcNameMetadata] = v
		case pvNameKey:
			subDirReplaceMap[pvNameMetadata] = v
		case mountOptionsField:
			if v != "" {
				mountOptions = append(mountOptions, v)
			}
		case mountPermissionsField:
			if v != "" {
				var err error
				if mountPermissions, err = strconv.ParseUint(v, 8, 32); err != nil {
					return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", v))
				}
			}
		}
	}

	if baseDir == "" {
		return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
	}

	pc := req.GetPublishContext()
	ip, ok := pc[lbcontroller.NodeAnnotation]
	if !ok {
		return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("NFS server IP not found in PublishContext %v for volume %q", pc, volumeID))
	}

	klog.Infof("NodePublishVolume found IP %q from PublishContext for volume %q", ip, volumeID)
	server = ip

	source := fmt.Sprintf("%s:%s", server, baseDir)
	if subDir != "" {
		// replace pv/pvc name namespace metadata in subDir
		subDir = replaceWithMap(subDir, subDirReplaceMap)

		source = strings.TrimRight(source, "/")
		source = fmt.Sprintf("%s/%s", source, subDir)
	}

	notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
	if err != nil {
		if os.IsNotExist(err) {
			if err := os.MkdirAll(targetPath, os.FileMode(mountPermissions)); err != nil {
				return nil, status.Error(codes.Internal, err.Error())
			}
			notMnt = true
		} else {
			return nil, status.Error(codes.Internal, err.Error())
		}
	}
	if !notMnt {
		return &csi.NodePublishVolumeResponse{}, nil
	}

	klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
	// parse read ahead mount option
	filteredMountOptions := collectMountOptions(mountOptions)
	err = ns.mounter.Mount(source, targetPath, "nfs", filteredMountOptions)
	if err != nil {
		if os.IsPermission(err) {
			return nil, status.Error(codes.PermissionDenied, err.Error())
		}
		if strings.Contains(err.Error(), "invalid argument") {
			return nil, status.Error(codes.InvalidArgument, err.Error())
		}
		return nil, status.Error(codes.Internal, err.Error())
	}

	if mountPermissions > 0 {
		if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
			return nil, status.Error(codes.Internal, err.Error())
		}
	} else {
		klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
	}

	readAheadKB, shouldUpdateReadAhead, err := extractReadAheadKBMountFlag(mountOptions)
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	if shouldUpdateReadAhead {
		if err := updateReadAhead(targetPath, readAheadKB); err != nil {
			return nil, status.Error(codes.Internal, err.Error())
		}
	}

	klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
	return &csi.NodePublishVolumeResponse{}, nil
}