func()

in pkg/csi/manila/controllerserver.go [76:178]


func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
	if err := validateCreateVolumeRequest(req); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	// Configuration

	params := req.GetParameters()
	if params == nil {
		params = make(map[string]string)
	}

	params["protocol"] = cs.d.shareProto

	shareOpts, err := options.NewControllerVolumeContext(params)
	if err != nil {
		return nil, status.Errorf(codes.InvalidArgument, "invalid volume parameters: %v", err)
	}

	shareMetadata, err := prepareShareMetadata(shareOpts.AppendShareMetadata, cs.d.clusterID)
	if err != nil {
		return nil, err
	}

	osOpts, err := options.NewOpenstackOptions(req.GetSecrets())
	if err != nil {
		return nil, status.Errorf(codes.InvalidArgument, "invalid OpenStack secrets: %v", err)
	}

	// Check for pending CreateVolume for this volume name
	if _, isPending := pendingVolumes.LoadOrStore(req.GetName(), true); isPending {
		return nil, status.Errorf(codes.Aborted, "volume %s is already being processed", req.GetName())
	}
	defer pendingVolumes.Delete(req.GetName())

	manilaClient, err := cs.d.manilaClientBuilder.New(osOpts)
	if err != nil {
		return nil, status.Errorf(codes.Unauthenticated, "failed to create Manila v2 client: %v", err)
	}

	shareTypeCaps, err := capabilities.GetManilaCapabilities(shareOpts.Type, manilaClient)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "failed to get Manila capabilities for share type %s: %v", shareOpts.Type, err)
	}

	requestedSize := req.GetCapacityRange().GetRequiredBytes()
	if requestedSize == 0 {
		// At least 1GiB
		requestedSize = 1 * bytesInGiB
	}

	sizeInGiB := bytesToGiB(requestedSize)

	// Retrieve an existing share or create a new one

	volCreator, err := getVolumeCreator(req.GetVolumeContentSource(), shareOpts, cs.d.compatOpts)
	if err != nil {
		return nil, err
	}

	share, err := volCreator.create(req, req.GetName(), sizeInGiB, manilaClient, shareOpts, shareMetadata)
	if err != nil {
		return nil, err
	}

	if err = verifyVolumeCompatibility(sizeInGiB, req, share, shareOpts, cs.d.compatOpts, shareTypeCaps); err != nil {
		return nil, status.Errorf(codes.AlreadyExists, "volume %s already exists, but is incompatible with the request: %v", req.GetName(), err)
	}

	// Grant access to the share

	ad := getShareAdapter(shareOpts.Protocol)

	accessRight, err := ad.GetOrGrantAccess(&shareadapters.GrantAccessArgs{Share: share, ManilaClient: manilaClient, Options: shareOpts})
	if err != nil {
		if err == wait.ErrWaitTimeout {
			return nil, status.Errorf(codes.DeadlineExceeded, "deadline exceeded while waiting for access rule %s for volume %s to become available", accessRight.ID, share.Name)
		}

		return nil, status.Errorf(codes.Internal, "failed to grant access to volume %s: %v", share.Name, err)
	}

	var accessibleTopology []*csi.Topology
	if cs.d.withTopology {
		// All requisite/preferred topologies are considered valid. Nodes from those zones are required to be able to reach the storage.
		// The operator is responsible for making sure that provided topology keys are valid and present on the nodes of the cluster.
		accessibleTopology = req.GetAccessibilityRequirements().GetPreferred()
	}

	volCtx := filterParametersForVolumeContext(params, options.NodeVolumeContextFields())
	volCtx["shareID"] = share.ID
	volCtx["shareAccessID"] = accessRight.ID

	return &csi.CreateVolumeResponse{
		Volume: &csi.Volume{
			VolumeId:           share.ID,
			ContentSource:      req.GetVolumeContentSource(),
			AccessibleTopology: accessibleTopology,
			CapacityBytes:      int64(sizeInGiB) * bytesInGiB,
			VolumeContext:      volCtx,
		},
	}, nil
}