func()

in pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go [431:496]


func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, cgs dv1.ComputeGroupStatus) error {
	var cg *dv1.ComputeGroup
	for i := range ddc.Spec.ComputeGroups {
		/*	uniqueId := ddc.GetCGId(&ddc.Spec.ComputeGroups[i])
			if clusterId == cgs.ClusterId {
				cg = &ddc.Spec.ComputeGroups[i]
			}*/
		if ddc.Spec.ComputeGroups[i].UniqueId == cgs.UniqueId {
			cg = &ddc.Spec.ComputeGroups[i]
		}
	}

	currentPVCs := corev1.PersistentVolumeClaimList{}
	//pvcMap := make(map[string]*corev1.PersistentVolumeClaim)

	pvcLabels := dcgs.newCGPodsSelector(ddc.Name, cgs.UniqueId)

	if err := dcgs.K8sclient.List(ctx, &currentPVCs, client.InNamespace(ddc.Namespace), client.MatchingLabels(pvcLabels)); err != nil {
		dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCListFailed, fmt.Sprintf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs list pvc failed:%s!", err.Error()))
		return err
	}

	// now only clear scale down pod.
	if cg == nil {
		return nil
	}

	var clearPVC []string
	//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
	stsName := ddc.GetCGStatefulsetName(cg)
	sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
	if err != nil {
		klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
		//waiting next reconciling.
		return nil
	}
	replicas := *sts.Spec.Replicas
	for _, pvc := range currentPVCs.Items {
		pvcName := pvc.Name
		sl := strings.Split(pvcName, stsName+"-")
		if len(sl) != 2 {
			klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s not format pvc name format.", ddc.Namespace, pvcName)
			continue
		}
		var index int64
		var perr error
		index, perr = strconv.ParseInt(sl[1], 10, 32)
		if perr != nil {
			klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s index parse failed, err=%s", ddc.Namespace, pvcName, err.Error())
			continue
		}
		if int32(index) >= replicas {
			clearPVC = append(clearPVC, pvcName)
		}
	}

	var mergeError error
	for _, pvcName := range clearPVC {
		if err = k8s.DeletePVC(ctx, dcgs.K8sclient, ddc.Namespace, pvcName, pvcLabels); err != nil {
			dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCDeleteFailed, err.Error())
			klog.Errorf("ClearStatefulsetUnusedPVCs deletePVCs failed: namespace %s, name %s delete pvc %s, err: %s .", ddc.Namespace, pvcName, pvcName, err.Error())
			mergeError = utils.MergeError(mergeError, err)
		}
	}
	return mergeError
}