in pkg/controller/sub_controller/sub_controller.go [541:597]
func (d *SubDefaultController) listAndDeletePersistentVolumeClaim(ctx context.Context, dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) error {
var replicas int32
switch componentType {
case dorisv1.Component_FE:
replicas = *dcr.Spec.FeSpec.Replicas
case dorisv1.Component_BE:
replicas = *dcr.Spec.BeSpec.Replicas
case dorisv1.Component_CN:
replicas = *dcr.Spec.CnSpec.Replicas
default:
}
dorisPersistentVolumes, err := d.GetFinalPersistentVolumes(ctx, dcr, componentType)
if err != nil {
d.K8srecorder.Event(dcr, string(EventWarning), PVCExplainFailed, fmt.Sprintf("listAndDeletePersistentVolumeClaim %s GetFinalPersistentVolumes failed:%s", componentType, err.Error()))
return err
}
pvcList := corev1.PersistentVolumeClaimList{}
selector := dorisv1.GenerateStatefulSetSelector(dcr, componentType)
stsName := dorisv1.GenerateComponentStatefulSetName(dcr, componentType)
if err := d.K8sclient.List(ctx, &pvcList, client.InNamespace(dcr.Namespace), client.MatchingLabels(selector)); err != nil {
d.K8srecorder.Event(dcr, string(EventWarning), PVCListFailed, string("list component "+componentType+" failed!"))
return err
}
//classify pvc by volume.Name, pvc.name generate by volume.Name + statefulset.Name + ordinal
pvcMap := make(map[string][]corev1.PersistentVolumeClaim)
for _, pvc := range pvcList.Items {
//start with unique string for classify pvc, avoid empty string match all pvc.Name
key := "-^"
for _, dorisPersistentVolume := range dorisPersistentVolumes {
if dorisPersistentVolume.Name != "" && strings.HasPrefix(pvc.Name, dorisPersistentVolume.Name) {
key = key + dorisPersistentVolume.Name
break
}
}
if _, ok := pvcMap[key]; !ok {
pvcMap[key] = []corev1.PersistentVolumeClaim{}
}
pvcMap[key] = append(pvcMap[key], pvc)
}
var mergeError error
for _, dorisPersistentVolume := range dorisPersistentVolumes {
// Clean up the existing PVC that is larger than expected
claims := pvcMap["-^"+dorisPersistentVolume.Name]
if len(claims) <= int(replicas) {
continue
}
if err := d.deletePVCs(ctx, dcr, selector, len(claims), stsName, dorisPersistentVolume.Name, replicas); err != nil {
mergeError = utils.MergeError(mergeError, err)
}
}
return mergeError
}