func()

in vsphere/vsphere.go [1249:1477]


func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
	klog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions)
	createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
		var datastoreInfo *vclib.DatastoreInfo
		var dsList []*vclib.DatastoreInfo

		// Create context
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
		vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
		if err != nil {
			return "", err
		}
		// If datastore not specified, then use default datastore
		datastoreName := strings.TrimSpace(volumeOptions.Datastore)
		if datastoreName == "" {
			datastoreName = strings.TrimSpace(vs.cfg.Workspace.DefaultDatastore)
		}
		// The given datastoreName may be present in more than one datacenter
		candidateDatastoreInfos, err := vs.FindDatastoreByName(ctx, datastoreName)
		if err != nil {
			return "", err
		}
		// Each of the datastores found is a candidate for Volume creation.
		// One of these will be selected based on given policy and/or zone.
		candidateDatastores := make(map[string]*vclib.DatastoreInfo)
		for _, dsInfo := range candidateDatastoreInfos {
			candidateDatastores[dsInfo.Info.Url] = dsInfo
		}

		var vmOptions *vclib.VMOptions
		var zonesToSearch []string

		if volumeOptions.SelectedNode != nil {
			if len(volumeOptions.Zone) > 1 {
				// In waitForFirstConsumer mode, if more than one allowedTopologies is specified, the volume should satisfy all these.
				zonesToSearch = volumeOptions.Zone
			} else {
				// Pick the selectedNode's zone, if available.
				nodeInfo, err := vs.nodeManager.GetNodeInfoWithNodeObject(volumeOptions.SelectedNode)
				if err != nil {
					klog.Errorf("Unable to get node information for %s. err: %+v", volumeOptions.SelectedNode.Name, err)
					return "", err
				}
				klog.V(4).Infof("selectedNode info : %s", nodeInfo)
				if nodeInfo.zone != nil && nodeInfo.zone.FailureDomain != "" {
					zonesToSearch = append(zonesToSearch, nodeInfo.zone.FailureDomain)
				}
			}
		} else {
			// If no selectedNode, pick allowedTopologies, if provided.
			zonesToSearch = volumeOptions.Zone
		}
		klog.V(1).Infof("Volume topology : %s", zonesToSearch)

		if volumeOptions.VSANStorageProfileData != "" || volumeOptions.StoragePolicyName != "" {
			// If datastore and zone are specified, first validate if the datastore is in the provided zone.
			if len(zonesToSearch) != 0 && volumeOptions.Datastore != "" {
				klog.V(4).Infof("Specified zone : %s, datastore : %s", zonesToSearch, volumeOptions.Datastore)
				dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)
				if err != nil {
					klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
					return "", err
				}

				// Validate if the datastore provided belongs to the zone. If not, fail the operation.
				found := false
				for _, ds := range dsList {
					if datastoreInfo, found = candidateDatastores[ds.Info.Url]; found {
						break
					}
				}
				if !found {
					err := fmt.Errorf("The specified datastore %s does not match the provided zones : %s", volumeOptions.Datastore, zonesToSearch)
					klog.Error(err)
					return "", err
				}
			}
			// Acquire a read lock to ensure multiple PVC requests can be processed simultaneously.
			cleanUpDummyVMLock.RLock()
			defer cleanUpDummyVMLock.RUnlock()
			// Create a new background routine that will delete any dummy VM's that are left stale.
			// This routine will get executed for every 5 minutes and gets initiated only once in its entire lifetime.
			cleanUpRoutineInitLock.Lock()
			if !cleanUpRoutineInitialized {
				klog.V(1).Infof("Starting a clean up routine to remove stale dummy VM's")
				go vs.cleanUpDummyVMs(DummyVMPrefixName)
				cleanUpRoutineInitialized = true
			}
			cleanUpRoutineInitLock.Unlock()
		}
		if volumeOptions.StoragePolicyName != "" && volumeOptions.Datastore == "" {
			if len(zonesToSearch) == 0 {
				klog.V(4).Infof("Selecting a shared datastore as per the storage policy %s", volumeOptions.StoragePolicyName)
				datastoreInfo, err = getPbmCompatibleDatastore(ctx, vsi.conn.Client, volumeOptions.StoragePolicyName, vs.nodeManager)
			} else {
				// If zone is specified, first get the datastores in the zone.
				dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)

				if err != nil {
					klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
					return "", err
				}

				klog.V(4).Infof("Specified zone : %s. Picking a datastore as per the storage policy %s among the zoned datastores : %s", zonesToSearch,
					volumeOptions.StoragePolicyName, dsList)
				// Among the compatible datastores, select the one based on the maximum free space.
				datastoreInfo, err = getPbmCompatibleZonedDatastore(ctx, vsi.conn.Client, volumeOptions.StoragePolicyName, dsList)
			}
			if err != nil {
				klog.Errorf("Failed to get pbm compatible datastore with storagePolicy: %s. err: %+v", volumeOptions.StoragePolicyName, err)
				return "", err
			}
			klog.V(1).Infof("Datastore selected as per policy : %s", datastoreInfo.Info.Name)
		} else {
			// If zone is specified, pick the datastore in the zone with maximum free space within the zone.
			if volumeOptions.Datastore == "" && len(zonesToSearch) != 0 {
				klog.V(4).Infof("Specified zone : %s", zonesToSearch)
				dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)

				if err != nil {
					klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
					return "", err
				}
				// If unable to get any datastore, fail the operation
				if len(dsList) == 0 {
					err := fmt.Errorf("Failed to find a shared datastore matching zone %s", zonesToSearch)
					klog.Error(err)
					return "", err
				}

				datastoreInfo, err = getMostFreeDatastore(ctx, nil, dsList)
				if err != nil {
					klog.Errorf("Failed to get shared datastore: %+v", err)
					return "", err
				}
				klog.V(1).Infof("Specified zone : %s. Selected datastore : %s", zonesToSearch, datastoreInfo.Info.Name)
			} else {
				var sharedDsList []*vclib.DatastoreInfo
				var err error
				if len(zonesToSearch) == 0 {
					// If zone is not provided, get the shared datastore across all node VMs.
					klog.V(4).Infof("Validating if datastore %s is shared across all node VMs", datastoreName)
					sharedDSFinder := &sharedDatastore{
						nodeManager:         vs.nodeManager,
						candidateDatastores: candidateDatastoreInfos,
					}
					datastoreInfo, err = sharedDSFinder.getSharedDatastore(ctx)
					if err != nil {
						klog.Errorf("Failed to get shared datastore: %+v", err)
						return "", err
					}
					if datastoreInfo == nil {
						err = fmt.Errorf("The specified datastore %s is not a shared datastore across node VMs", datastoreName)
						klog.Error(err)
						return "", err
					}
				} else {
					// If zone is provided, get the shared datastores in that zone.
					klog.V(4).Infof("Validating if datastore %s is in zone %s ", datastoreName, zonesToSearch)
					sharedDsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)
					if err != nil {
						klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
						return "", err
					}
					found := false
					for _, sharedDs := range sharedDsList {
						if datastoreInfo, found = candidateDatastores[sharedDs.Info.Url]; found {
							klog.V(4).Infof("Datastore validation succeeded")
							found = true
							break
						}
					}
					if !found {
						err = fmt.Errorf("The specified datastore %s does not match the provided zones : %s", datastoreName, zonesToSearch)
						klog.Error(err)
						return "", err
					}
				}
			}
		}

		// if datastoreInfo is still not determined, it is an error condition
		if datastoreInfo == nil {
			klog.Errorf("ambiguous datastore name %s, cannot be found among: %v", datastoreName, candidateDatastoreInfos)
			return "", fmt.Errorf("ambiguous datastore name %s", datastoreName)
		}
		ds := datastoreInfo.Datastore
		volumeOptions.Datastore = datastoreInfo.Info.Name
		vmOptions, err = vs.setVMOptions(ctx, vsi.conn, ds)
		if err != nil {
			klog.Errorf("failed to set VM options required to create a vsphere volume. err: %+v", err)
			return "", err
		}
		kubeVolsPath := filepath.Clean(ds.Path(VolDir)) + "/"
		err = ds.CreateDirectory(ctx, kubeVolsPath, false)
		if err != nil && err != vclib.ErrFileAlreadyExist {
			klog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
			return "", err
		}
		volumePath := kubeVolsPath + volumeOptions.Name + ".vmdk"
		disk := diskmanagers.VirtualDisk{
			DiskPath:      volumePath,
			VolumeOptions: volumeOptions,
			VMOptions:     vmOptions,
		}
		volumePath, err = disk.Create(ctx, ds)
		if err != nil {
			klog.Errorf("Failed to create a vsphere volume with volumeOptions: %+v on datastore: %s. err: %+v", volumeOptions, ds, err)
			return "", err
		}
		// Get the canonical path for the volume path.
		canonicalVolumePath, err = getcanonicalVolumePath(ctx, datastoreInfo.Datacenter, volumePath)
		if err != nil {
			klog.Errorf("Failed to get canonical vsphere volume path for volume: %s with volumeOptions: %+v on datastore: %s. err: %+v", volumePath, volumeOptions, ds, err)
			return "", err
		}
		if filepath.Base(datastoreName) != datastoreName {
			// If datastore is within cluster, add cluster path to the volumePath
			canonicalVolumePath = strings.Replace(canonicalVolumePath, filepath.Base(datastoreName), datastoreName, 1)
		}
		return canonicalVolumePath, nil
	}
	requestTime := time.Now()
	canonicalVolumePath, err = createVolumeInternal(volumeOptions)
	vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err)
	klog.V(4).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath)
	return canonicalVolumePath, err
}