func()

in pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/pvc.go [39:84]


func (dfc *DisaggregatedFEController) listAndDeletePersistentVolumeClaim(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) error {
	spec := ddc.Spec.FeSpec
	replicas := *spec.Replicas
	currentPVCs := corev1.PersistentVolumeClaimList{}
	pvcLabels := dfc.newFEPodsSelector(ddc.Name)
	stsName := ddc.GetFEStatefulsetName()

	if err := dfc.K8sclient.List(ctx, &currentPVCs, client.InNamespace(ddc.Namespace), client.MatchingLabels(pvcLabels)); err != nil {
		dfc.K8srecorder.Event(ddc, string(sub_controller.EventWarning), sub_controller.PVCListFailed, fmt.Sprintf("DisaggregatedFEController listAndDeletePersistentVolumeClaim list pvc failed:%s!", err.Error()))
		return err
	}

	//the old clear pvc resource use the specific pvc' name. the new template generate pvc's name is not equal to old.
	//new clear pvc use the statefulset replicas as the ordinal and pvc's format. the pvc's name: {volumeName(specify in persistentVolume of dcr)-statefulsetName-index}.
	//split pvc's name use statefulsetName+"-", the last splice component is pod index.
	var clearPVC []string
	for _, pvc := range currentPVCs.Items {
		pvcName := pvc.Name
		var index int64 = -1
		sl := strings.Split(pvcName, stsName+"-")
		if len(sl) != 2 {
			klog.Errorf("DisaggregatedFEController listAndDeletePersistentVolumeClaim namespace %s pvcName %s is not match statefulset format.", ddc.Namespace, pvcName)
			continue
		}
		var perr error
		index, perr = strconv.ParseInt(sl[1], 10, 32)
		if perr != nil {
			klog.Errorf("DisaggregatedFEController listAndDeletePersistentVolumeClaim namespace %s pvcName %s parse index faield, err=%s", ddc.Namespace, pvcName, perr.Error())
			continue
		}

		if int32(index) >= replicas {
			clearPVC = append(clearPVC, pvcName)
		}
	}

	var mergeError error
	for _, pvcName := range clearPVC {
		if err := k8s.DeletePVC(ctx, dfc.K8sclient, ddc.Namespace, pvcName, pvcLabels); err != nil {
			dfc.K8srecorder.Event(ddc, string(sub_controller.EventWarning), sub_controller.PVCDeleteFailed, err.Error())
			klog.Errorf("listAndDeletePersistentVolumeClaim deletePVCs failed: namespace %s, name %s delete pvc %s, err: %s .", ddc.Namespace, pvcName, pvcName, err.Error())
			mergeError = utils.MergeError(mergeError, err)
		}
	}
	return mergeError
}