in pkg/csi_driver/node.go [82:230]
func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Rate limit NodePublishVolume calls to avoid kube API throttling.
if err := s.limiter.Wait(ctx); err != nil {
return nil, status.Errorf(codes.Aborted, "NodePublishVolume request is aborted due to rate limit: %v", err)
}
// Validate arguments
targetPath, bucketName, fuseMountOptions, skipBucketAccessCheck, disableMetricsCollection, err := parseRequestArguments(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
klog.V(6).Infof("NodePublishVolume on volume %q has skipBucketAccessCheck %t", bucketName, skipBucketAccessCheck)
if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Acquire a lock on the target path instead of volumeID, since we do not want to serialize multiple node publish calls on the same volume.
if acquired := s.volumeLocks.TryAcquire(targetPath); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, targetPath)
}
defer s.volumeLocks.Release(targetPath)
vc := req.GetVolumeContext()
// Check if the given Service Account has the access to the GCS bucket, and the bucket exists.
// skip check if it has ever succeeded
if bucketName != "_" && !skipBucketAccessCheck {
// Use target path as an volume identifier because it corresponds to Pods and volumes.
// Pods may belong to different namespaces and would need their own access check.
vs, ok := s.volumeStateStore.Load(targetPath)
if !ok {
s.volumeStateStore.Store(targetPath, &util.VolumeState{})
vs, _ = s.volumeStateStore.Load(targetPath)
}
// volumeState is safe to access for remaining of function since volumeLock prevents
// Node Publish/Unpublish Volume calls from running more than once at a time per volume.
if !vs.BucketAccessCheckPassed {
storageService, err := s.prepareStorageService(ctx, vc)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "failed to prepare storage service: %v", err)
}
defer storageService.Close()
if exist, err := storageService.CheckBucketExists(ctx, &storage.ServiceBucket{Name: bucketName}); !exist {
return nil, status.Errorf(storage.ParseErrCode(err), "failed to get GCS bucket %q: %v", bucketName, err)
}
vs.BucketAccessCheckPassed = true
}
}
// Check if the sidecar container was injected into the Pod
pod, err := s.k8sClients.GetPod(vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName])
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to get pod: %v", err)
}
if s.shouldStartTokenServer(pod) && pod.Spec.HostNetwork {
identityProvider := s.driver.config.TokenManager.GetIdentityProvider()
fuseMountOptions = joinMountOptions(fuseMountOptions, []string{"token-server-identity-provider=" + identityProvider})
}
node, err := s.k8sClients.GetNode(s.driver.config.NodeID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to get node: %v", err)
}
val, ok := node.Labels[clientset.GkeMetaDataServerKey]
// If Workload Identity is not enabled, the key should be missing; the check for "val == false" is just for extra caution
isWorkloadIdentityDisabled := val != "true" || !ok
if isWorkloadIdentityDisabled && !pod.Spec.HostNetwork {
return nil, status.Errorf(codes.FailedPrecondition, "Workload Identity Federation is not enabled on node. Please make sure this is enabled on both cluster and node pool level (https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity)")
}
// Since the webhook mutating ordering is not definitive,
// the sidecar position is not checked in the ValidatePodHasSidecarContainerInjected func.
shouldInjectedByWebhook := strings.ToLower(pod.Annotations[webhook.GcsFuseVolumeEnableAnnotation]) == util.TrueStr
sidecarInjected, isInitContainer := webhook.ValidatePodHasSidecarContainerInjected(pod)
if !sidecarInjected {
if shouldInjectedByWebhook {
return nil, status.Error(codes.Internal, "the webhook failed to inject the sidecar container into the Pod spec")
}
return nil, status.Error(codes.FailedPrecondition, "failed to find the sidecar container in Pod spec")
}
// Register metrics collector.
// It is idempotent to register the same collector in node republish calls.
if s.driver.config.MetricsManager != nil && !disableMetricsCollection {
klog.V(6).Infof("NodePublishVolume enabling metrics collector for target path %q", targetPath)
s.driver.config.MetricsManager.RegisterMetricsCollector(targetPath, pod.Namespace, pod.Name, bucketName)
}
// Check if the sidecar container is still required,
// if not, put an exit file to the emptyDir path to
// notify the sidecar container to exit.
if !isInitContainer {
if err := putExitFile(pod, targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
// Check if there is any error from the gcsfuse
code, err := checkGcsFuseErr(isInitContainer, pod, targetPath)
if code != codes.OK {
if code == codes.Canceled {
klog.V(4).Infof("NodePublishVolume on volume %q to target path %q is not needed because the gcsfuse has terminated.", bucketName, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
return nil, status.Error(code, err.Error())
}
// Check if there is any error from the sidecar container
code, err = checkSidecarContainerErr(isInitContainer, pod)
if code != codes.OK {
return nil, status.Error(code, err.Error())
}
// TODO: Check if the socket listener timed out
// Check if the target path is already mounted
mounted, err := s.isDirMounted(targetPath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check if path %q is already mounted: %v", targetPath, err)
}
if mounted {
klog.V(4).Infof("NodePublishVolume succeeded on volume %q to target path %q, mount already exists.", bucketName, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
klog.V(4).Infof("NodePublishVolume attempting mkdir for path %q", targetPath)
if err := os.MkdirAll(targetPath, 0o750); err != nil {
return nil, status.Errorf(codes.Internal, "mkdir failed for path %q: %v", targetPath, err)
}
// Start to mount
if err = s.mounter.Mount(bucketName, targetPath, FuseMountType, fuseMountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount volume %q to target path %q: %v", bucketName, targetPath, err)
}
klog.V(4).Infof("NodePublishVolume succeeded on volume %q to target path %q", bucketName, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}