in ecs-agent/daemonimages/csidriver/driver/node.go [71:249]
func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
klog.V(4).InfoS("NodeStageVolume: called", "args", *req)
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
target := req.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}
volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) {
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}
volumeContext := req.GetVolumeContext()
if isValidVolumeContext := isValidVolumeContext(volumeContext); !isValidVolumeContext {
return nil, status.Error(codes.InvalidArgument, "Volume Attribute is not valid")
}
// If the access type is block, do nothing for stage
switch volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
return &csi.NodeStageVolumeResponse{}, nil
}
mountVolume := volCap.GetMount()
if mountVolume == nil {
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mount is nil within volume capability")
}
fsType := mountVolume.GetFsType()
if len(fsType) == 0 {
fsType = defaultFsType
}
_, ok := ValidFSTypes[strings.ToLower(fsType)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "NodeStageVolume: invalid fstype %s", fsType)
}
context := req.GetVolumeContext()
blockSize, err := recheckParameter(context, BlockSizeKey, FileSystemConfigs, fsType)
if err != nil {
return nil, err
}
inodeSize, err := recheckParameter(context, InodeSizeKey, FileSystemConfigs, fsType)
if err != nil {
return nil, err
}
bytesPerInode, err := recheckParameter(context, BytesPerInodeKey, FileSystemConfigs, fsType)
if err != nil {
return nil, err
}
numInodes, err := recheckParameter(context, NumberOfInodesKey, FileSystemConfigs, fsType)
if err != nil {
return nil, err
}
mountOptions := collectMountOptions(fsType, mountVolume.MountFlags)
if ok = d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).InfoS("NodeStageVolume: volume operation finished", "volumeID", volumeID)
d.inFlight.Delete(volumeID)
}()
devicePath, ok := req.PublishContext[DevicePathKey]
if !ok {
return nil, status.Error(codes.InvalidArgument, "Device path not provided")
}
partition := ""
if part, ok := volumeContext[VolumeAttributePartition]; ok {
if part != "0" {
partition = part
} else {
klog.V(4).InfoS("NodeStageVolume: invalid partition config, will ignore.", "partition", part)
}
}
source, err := d.findDevicePath(devicePath, volumeID, partition)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to find device path %s. %v", devicePath, err)
}
exists, err := d.mounter.PathExists(target)
klog.V(4).InfoS("NodeStageVolume: path exists:", "exists", exists)
if err != nil {
klog.V(4).InfoS("NodeStageVolume: path does not exist:", "err", err)
msg := fmt.Sprintf("failed to check if target %q exists: %v", target, err)
return nil, status.Error(codes.Internal, msg)
}
// When exists is true it means target path was created but device isn't mounted.
// We don't want to do anything in that case and let the operation proceed.
// Otherwise we need to create the target directory.
if !exists {
// If target path does not exist we need to create the directory where volume will be staged
klog.V(4).InfoS("NodeStageVolume: creating target dir", "target", target)
if err = d.mounter.MakeDir(target); err != nil {
msg := fmt.Sprintf("could not create target dir %q: %v", target, err)
return nil, status.Error(codes.Internal, msg)
}
}
// Check if a device is mounted in target directory
device, _, err := d.mounter.GetDeviceNameFromMount(target)
klog.V(4).InfoS("NodeStageVolume: find device path", "device", device)
if err != nil {
msg := fmt.Sprintf("failed to check if volume is already mounted: %v", err)
return nil, status.Error(codes.Internal, msg)
}
// This operation (NodeStageVolume) MUST be idempotent.
// If the volume corresponding to the volume_id is already staged to the staging_target_path,
// and is identical to the specified volume_capability the Plugin MUST reply 0 OK.
klog.V(4).InfoS("NodeStageVolume: checking if volume is already staged", "device", device, "source", source, "target", target)
if device == source {
klog.V(4).InfoS("NodeStageVolume: volume already staged", "volumeID", volumeID)
return &csi.NodeStageVolumeResponse{}, nil
}
// FormatAndMount will format only if needed
klog.V(4).InfoS("NodeStageVolume: staging volume", "source", source, "volumeID", volumeID, "target", target, "fstype", fsType)
formatOptions := []string{}
if len(blockSize) > 0 {
if fsType == FSTypeXfs {
blockSize = "size=" + blockSize
}
formatOptions = append(formatOptions, "-b", blockSize)
}
if len(inodeSize) > 0 {
option := "-I"
if fsType == FSTypeXfs {
option, inodeSize = "-i", "size="+inodeSize
}
formatOptions = append(formatOptions, option, inodeSize)
}
if len(bytesPerInode) > 0 {
formatOptions = append(formatOptions, "-i", bytesPerInode)
}
if len(numInodes) > 0 {
formatOptions = append(formatOptions, "-N", numInodes)
}
err = d.mounter.FormatAndMountSensitiveWithFormatOptions(source, target, fsType, mountOptions, nil, formatOptions)
if err != nil {
klog.V(4).InfoS("NodeStageVolume: format mount fail", "error", err)
msg := fmt.Sprintf("could not format %q and mount it at %q: %v", source, target, err)
return nil, status.Error(codes.Internal, msg)
}
needResize, err := d.mounter.NeedResize(source, target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not determine if volume %q (%q) need to be resized: %v", req.GetVolumeId(), source, err)
}
if needResize {
r, err := d.mounter.NewResizeFs()
if err != nil {
return nil, status.Errorf(codes.Internal, "Error attempting to create new ResizeFs: %v", err)
}
klog.V(2).InfoS("Volume needs resizing", "source", source)
if _, err := r.Resize(source, target); err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q (%q): %v", volumeID, source, err)
}
}
klog.V(4).InfoS("NodeStageVolume: successfully staged volume", "source", source, "volumeID", volumeID, "target", target, "fstype", fsType)
return &csi.NodeStageVolumeResponse{}, nil
}