in pkg/csi_mounter/csi_mounter.go [76:150]
func (m *Mounter) Mount(source string, target string, fstype string, options []string) error {
m.mux.Lock()
defer m.mux.Unlock()
csiMountOptions, sidecarMountOptions, sysfsBDI, err := prepareMountOptions(options)
if err != nil {
return err
}
// Prepare sidecar mounter MountConfig
mc := sidecarmounter.MountConfig{
BucketName: source,
Options: sidecarMountOptions,
}
msg, err := json.Marshal(mc)
if err != nil {
return fmt.Errorf("failed to marshal sidecar mounter MountConfig %v: %w", mc, err)
}
podID, volumeName, _ := util.ParsePodIDVolumeFromTargetpath(target)
logPrefix := fmt.Sprintf("[Pod %v, Volume %v, Bucket %v]", podID, volumeName, source)
klog.V(4).Infof("%v opening the device /dev/fuse", logPrefix)
fd, err := syscall.Open("/dev/fuse", syscall.O_RDWR, 0o644)
if err != nil {
return fmt.Errorf("failed to open the device /dev/fuse: %w", err)
}
csiMountOptions = append(csiMountOptions, fmt.Sprintf("fd=%v", fd))
klog.V(4).Infof("%v mounting the fuse filesystem", logPrefix)
err = m.MountSensitiveWithoutSystemdWithMountFlags(source, target, fstype, csiMountOptions, nil, []string{"--internal-only"})
if err != nil {
return fmt.Errorf("failed to mount the fuse filesystem: %w", err)
}
if len(sysfsBDI) != 0 {
go func() {
// updateSysfsConfig may hang until the file descriptor (fd) is either consumed or canceled.
// It will succeed once dfuse finishes the mount process, or it will fail if dfuse fails
// or the mount point is cleaned up due to mounting failures.
if err := updateSysfsConfig(target, sysfsBDI); err != nil {
klog.Errorf("%v failed to update kernel parameters: %v", logPrefix, err)
}
}()
}
listener, err := m.createSocket(target, logPrefix)
if err != nil {
// If mount failed at this step,
// cleanup the mount point and allow the CSI driver NodePublishVolume to retry.
klog.Warningf("%v failed to create socket, clean up the mount point", logPrefix)
syscall.Close(fd)
if m.UnmountWithForce(target, time.Second*5) != nil {
klog.Warningf("%v failed to clean up the mount point", logPrefix)
}
return err
}
// Close the listener and fd after 1 hour timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
go func() {
<-ctx.Done()
klog.V(4).Infof("%v closing the socket and fd", logPrefix)
listener.Close()
syscall.Close(fd)
}()
// Asynchronously waiting for the sidecar container to connect to the listener
go startAcceptConn(listener, logPrefix, msg, fd, cancel)
return nil
}