in pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go [431:496]
func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, cgs dv1.ComputeGroupStatus) error {
var cg *dv1.ComputeGroup
for i := range ddc.Spec.ComputeGroups {
/* uniqueId := ddc.GetCGId(&ddc.Spec.ComputeGroups[i])
if clusterId == cgs.ClusterId {
cg = &ddc.Spec.ComputeGroups[i]
}*/
if ddc.Spec.ComputeGroups[i].UniqueId == cgs.UniqueId {
cg = &ddc.Spec.ComputeGroups[i]
}
}
currentPVCs := corev1.PersistentVolumeClaimList{}
//pvcMap := make(map[string]*corev1.PersistentVolumeClaim)
pvcLabels := dcgs.newCGPodsSelector(ddc.Name, cgs.UniqueId)
if err := dcgs.K8sclient.List(ctx, ¤tPVCs, client.InNamespace(ddc.Namespace), client.MatchingLabels(pvcLabels)); err != nil {
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCListFailed, fmt.Sprintf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs list pvc failed:%s!", err.Error()))
return err
}
// now only clear scale down pod.
if cg == nil {
return nil
}
var clearPVC []string
//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
stsName := ddc.GetCGStatefulsetName(cg)
sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
//waiting next reconciling.
return nil
}
replicas := *sts.Spec.Replicas
for _, pvc := range currentPVCs.Items {
pvcName := pvc.Name
sl := strings.Split(pvcName, stsName+"-")
if len(sl) != 2 {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s not format pvc name format.", ddc.Namespace, pvcName)
continue
}
var index int64
var perr error
index, perr = strconv.ParseInt(sl[1], 10, 32)
if perr != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s index parse failed, err=%s", ddc.Namespace, pvcName, err.Error())
continue
}
if int32(index) >= replicas {
clearPVC = append(clearPVC, pvcName)
}
}
var mergeError error
for _, pvcName := range clearPVC {
if err = k8s.DeletePVC(ctx, dcgs.K8sclient, ddc.Namespace, pvcName, pvcLabels); err != nil {
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCDeleteFailed, err.Error())
klog.Errorf("ClearStatefulsetUnusedPVCs deletePVCs failed: namespace %s, name %s delete pvc %s, err: %s .", ddc.Namespace, pvcName, pvcName, err.Error())
mergeError = utils.MergeError(mergeError, err)
}
}
return mergeError
}