in pkg/csi_driver/node.go [189:284]
func (s *nodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
stagingTargetPath := req.GetStagingTargetPath()
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path not provided")
}
targetPath := req.GetTargetPath()
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if acquired := s.volumeLocks.TryAcquire(targetPath); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, targetPath)
}
defer s.volumeLocks.Release(targetPath)
mountOptions := []string{"bind"}
ro := req.GetReadonly()
if ro {
mountOptions = append(mountOptions, "ro")
}
var fsGroup string
if m := volCap.GetMount(); m != nil {
for _, f := range m.GetMountFlags() {
if !hasOption(mountOptions, f) {
mountOptions = append(mountOptions, f)
}
}
if m.GetVolumeMountGroup() != "" {
fsGroup = m.GetVolumeMountGroup()
}
}
nodeName := s.driver.config.NodeID
mounted, err := s.isMounted(targetPath)
if err != nil {
return nil, err
}
if mounted {
if err := setVolumeOwnershipTopLevel(volumeID, targetPath, fsGroup, ro); err != nil {
klog.Infof("setVolumeOwnershipTopLevel failed for volume %q, path %q, fsGroup %q, cleaning up mount point on node %s", volumeID, targetPath, fsGroup, nodeName)
if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodePublishVolumeResponse{}, nil
}
klog.V(5).Infof("NodePublishVolume creating dir %s on node %s", targetPath, nodeName)
if err := makeDir(targetPath); err != nil {
return nil, status.Errorf(codes.Internal, "Could not create dir %q on node %s: %v", targetPath, nodeName, err)
}
if err := s.mounter.MountSensitiveWithoutSystemd(stagingTargetPath, targetPath, "lustre", mountOptions, nil); err != nil {
klog.Errorf("Mount %q failed on node %s, cleaning up", targetPath, nodeName)
if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
}
return nil, status.Errorf(codes.Internal, "mount %q failed on node %s: %v", targetPath, nodeName, err.Error())
}
klog.V(4).Infof("NodePublishVolume successfully mounted %s on node %s", targetPath, nodeName)
if err := setVolumeOwnershipTopLevel(volumeID, targetPath, fsGroup, ro); err != nil {
klog.Infof("setVolumeOwnershipTopLevel failed for volume %q, path %q, fsGroup %q, cleaning up mount point on node %s", volumeID, targetPath, fsGroup, nodeName)
if unmntErr := mount.CleanupMountPoint(targetPath, s.mounter, false /* extensiveMountPointCheck */); unmntErr != nil {
klog.Errorf("Unmount %q failed on node %s: %v", targetPath, nodeName, unmntErr.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodePublishVolumeResponse{}, nil
}