func()

in pkg/csi_driver/node.go [82:230]


func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
	// Rate limit NodePublishVolume calls to avoid kube API throttling.
	if err := s.limiter.Wait(ctx); err != nil {
		return nil, status.Errorf(codes.Aborted, "NodePublishVolume request is aborted due to rate limit: %v", err)
	}

	// Validate arguments
	targetPath, bucketName, fuseMountOptions, skipBucketAccessCheck, disableMetricsCollection, err := parseRequestArguments(req)
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}
	klog.V(6).Infof("NodePublishVolume on volume %q has skipBucketAccessCheck %t", bucketName, skipBucketAccessCheck)

	if err := s.driver.validateVolumeCapabilities([]*csi.VolumeCapability{req.GetVolumeCapability()}); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	// Acquire a lock on the target path instead of volumeID, since we do not want to serialize multiple node publish calls on the same volume.
	if acquired := s.volumeLocks.TryAcquire(targetPath); !acquired {
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, targetPath)
	}
	defer s.volumeLocks.Release(targetPath)

	vc := req.GetVolumeContext()

	// Check if the given Service Account has the access to the GCS bucket, and the bucket exists.
	// skip check if it has ever succeeded
	if bucketName != "_" && !skipBucketAccessCheck {
		// Use target path as an volume identifier because it corresponds to Pods and volumes.
		// Pods may belong to different namespaces and would need their own access check.
		vs, ok := s.volumeStateStore.Load(targetPath)
		if !ok {
			s.volumeStateStore.Store(targetPath, &util.VolumeState{})
			vs, _ = s.volumeStateStore.Load(targetPath)
		}
		// volumeState is safe to access for remaining of function since volumeLock prevents
		// Node Publish/Unpublish Volume calls from running more than once at a time per volume.
		if !vs.BucketAccessCheckPassed {
			storageService, err := s.prepareStorageService(ctx, vc)
			if err != nil {
				return nil, status.Errorf(codes.Unauthenticated, "failed to prepare storage service: %v", err)
			}
			defer storageService.Close()

			if exist, err := storageService.CheckBucketExists(ctx, &storage.ServiceBucket{Name: bucketName}); !exist {
				return nil, status.Errorf(storage.ParseErrCode(err), "failed to get GCS bucket %q: %v", bucketName, err)
			}

			vs.BucketAccessCheckPassed = true
		}
	}

	// Check if the sidecar container was injected into the Pod
	pod, err := s.k8sClients.GetPod(vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName])
	if err != nil {
		return nil, status.Errorf(codes.NotFound, "failed to get pod: %v", err)
	}

	if s.shouldStartTokenServer(pod) && pod.Spec.HostNetwork {
		identityProvider := s.driver.config.TokenManager.GetIdentityProvider()
		fuseMountOptions = joinMountOptions(fuseMountOptions, []string{"token-server-identity-provider=" + identityProvider})
	}

	node, err := s.k8sClients.GetNode(s.driver.config.NodeID)
	if err != nil {
		return nil, status.Errorf(codes.NotFound, "failed to get node: %v", err)
	}

	val, ok := node.Labels[clientset.GkeMetaDataServerKey]
	// If Workload Identity is not enabled, the key should be missing; the check for "val == false" is just for extra caution
	isWorkloadIdentityDisabled := val != "true" || !ok
	if isWorkloadIdentityDisabled && !pod.Spec.HostNetwork {
		return nil, status.Errorf(codes.FailedPrecondition, "Workload Identity Federation is not enabled on node. Please make sure this is enabled on both cluster and node pool level (https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity)")
	}

	// Since the webhook mutating ordering is not definitive,
	// the sidecar position is not checked in the ValidatePodHasSidecarContainerInjected func.
	shouldInjectedByWebhook := strings.ToLower(pod.Annotations[webhook.GcsFuseVolumeEnableAnnotation]) == util.TrueStr
	sidecarInjected, isInitContainer := webhook.ValidatePodHasSidecarContainerInjected(pod)
	if !sidecarInjected {
		if shouldInjectedByWebhook {
			return nil, status.Error(codes.Internal, "the webhook failed to inject the sidecar container into the Pod spec")
		}

		return nil, status.Error(codes.FailedPrecondition, "failed to find the sidecar container in Pod spec")
	}

	// Register metrics collector.
	// It is idempotent to register the same collector in node republish calls.
	if s.driver.config.MetricsManager != nil && !disableMetricsCollection {
		klog.V(6).Infof("NodePublishVolume enabling metrics collector for target path %q", targetPath)
		s.driver.config.MetricsManager.RegisterMetricsCollector(targetPath, pod.Namespace, pod.Name, bucketName)
	}

	// Check if the sidecar container is still required,
	// if not, put an exit file to the emptyDir path to
	// notify the sidecar container to exit.
	if !isInitContainer {
		if err := putExitFile(pod, targetPath); err != nil {
			return nil, status.Error(codes.Internal, err.Error())
		}
	}

	// Check if there is any error from the gcsfuse
	code, err := checkGcsFuseErr(isInitContainer, pod, targetPath)
	if code != codes.OK {
		if code == codes.Canceled {
			klog.V(4).Infof("NodePublishVolume on volume %q to target path %q is not needed because the gcsfuse has terminated.", bucketName, targetPath)

			return &csi.NodePublishVolumeResponse{}, nil
		}

		return nil, status.Error(code, err.Error())
	}

	// Check if there is any error from the sidecar container
	code, err = checkSidecarContainerErr(isInitContainer, pod)
	if code != codes.OK {
		return nil, status.Error(code, err.Error())
	}

	// TODO: Check if the socket listener timed out

	// Check if the target path is already mounted
	mounted, err := s.isDirMounted(targetPath)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "failed to check if path %q is already mounted: %v", targetPath, err)
	}

	if mounted {
		klog.V(4).Infof("NodePublishVolume succeeded on volume %q to target path %q, mount already exists.", bucketName, targetPath)

		return &csi.NodePublishVolumeResponse{}, nil
	}

	klog.V(4).Infof("NodePublishVolume attempting mkdir for path %q", targetPath)
	if err := os.MkdirAll(targetPath, 0o750); err != nil {
		return nil, status.Errorf(codes.Internal, "mkdir failed for path %q: %v", targetPath, err)
	}

	// Start to mount
	if err = s.mounter.Mount(bucketName, targetPath, FuseMountType, fuseMountOptions); err != nil {
		return nil, status.Errorf(codes.Internal, "failed to mount volume %q to target path %q: %v", bucketName, targetPath, err)
	}

	klog.V(4).Infof("NodePublishVolume succeeded on volume %q to target path %q", bucketName, targetPath)

	return &csi.NodePublishVolumeResponse{}, nil
}