func()

in pkg/controller/sub_controller/sub_controller.go [414:472]


func (d *SubDefaultController) preparePersistentVolumeClaim(ctx context.Context, dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) bool {
	var replicas int32
	switch componentType {
	case dorisv1.Component_FE:
		replicas = *dcr.Spec.FeSpec.Replicas
	case dorisv1.Component_BE:
		replicas = *dcr.Spec.BeSpec.Replicas
	case dorisv1.Component_CN:
		replicas = *dcr.Spec.CnSpec.Replicas
	default:
	}

	dorisPersistentVolumes, err := d.GetFinalPersistentVolumes(ctx, dcr, componentType)
	if err != nil {
		d.K8srecorder.Event(dcr, string(EventWarning), PVCExplainFailed, fmt.Sprintf("listAndDeletePersistentVolumeClaim %s GetFinalPersistentVolumes failed:%s", componentType, err.Error()))
		return false
	}

	pvcList := corev1.PersistentVolumeClaimList{}
	selector := dorisv1.GenerateStatefulSetSelector(dcr, componentType)
	stsName := dorisv1.GenerateComponentStatefulSetName(dcr, componentType)
	if err := d.K8sclient.List(ctx, &pvcList, client.InNamespace(dcr.Namespace), client.MatchingLabels(selector)); err != nil {
		d.K8srecorder.Event(dcr, string(EventWarning), PVCListFailed, string("list component "+componentType+" failed!"))
		return false
	}
	//classify pvc by volume.Name, pvc.name generate by volume.Name + statefulset.Name + ordinal
	pvcMap := make(map[string][]corev1.PersistentVolumeClaim)

	for _, pvc := range pvcList.Items {
		//start with unique string for classify pvc, avoid empty string match all pvc.Name
		key := "-^"
		for _, dorisPersistentVolume := range dorisPersistentVolumes {
			if dorisPersistentVolume.Name != "" && strings.HasPrefix(pvc.Name, dorisPersistentVolume.Name) {
				key = key + dorisPersistentVolume.Name
				break
			}
		}

		if _, ok := pvcMap[key]; !ok {
			pvcMap[key] = []corev1.PersistentVolumeClaim{}
		}
		pvcMap[key] = append(pvcMap[key], pvc)
	}

	//presents the pvc have all created or updated to new version.
	prepared := true
	for _, dorisPersistentVolume := range dorisPersistentVolumes {
		// if provider not `operator` should not manage pvc.
		if dorisPersistentVolume.PVCProvisioner != dorisv1.PVCProvisionerOperator {
			continue
		}

		if !d.patchPVCs(ctx, dcr, selector, pvcMap["-^"+dorisPersistentVolume.Name], stsName, dorisPersistentVolume, replicas) {
			prepared = false
		}
	}

	return prepared
}