in pkg/controller/sub_controller/sub_controller.go [474:515]
func (d *SubDefaultController) patchPVCs(ctx context.Context, dcr *dorisv1.DorisCluster, selector map[string]string,
pvcs []corev1.PersistentVolumeClaim, stsName string, volume dorisv1.PersistentVolume, replicas int32) bool {
//patch already exist in k8s .
prepared := true
for _, pvc := range pvcs {
oldCapacity := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
newCapacity := volume.PersistentVolumeClaimSpec.Resources.Requests[corev1.ResourceStorage]
if !oldCapacity.Equal(newCapacity) {
// if pvc need update, the resource have not prepared, return false.
prepared = false
eventType := EventNormal
reason := PVCUpdate
message := pvc.Name + " update successfully!"
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = newCapacity
if err := d.K8sclient.Patch(ctx, &pvc, client.Merge); err != nil {
klog.Errorf("SubDefaultController namespace %s name %s patch pvc %s failed, %s", dcr.Namespace, dcr.Name, pvc.Name, err.Error())
eventType = EventWarning
reason = PVCUpdateFailed
message = pvc.Name + " update failed, " + err.Error()
}
d.K8srecorder.Event(dcr, string(eventType), reason, message)
}
}
// if need add new pvc, the resource prepared not finished, return false.
if len(pvcs) < int(replicas) {
prepared = false
d.K8srecorder.Event(dcr, string(EventNormal), PVCCreate, fmt.Sprintf("create PVC ordinal %d - %d", len(pvcs), replicas))
}
baseOrdinal := len(pvcs)
for ; baseOrdinal < int(replicas); baseOrdinal++ {
pvc := resource.BuildPVC(volume, selector, dcr.Namespace, stsName, strconv.Itoa(baseOrdinal))
if err := d.K8sclient.Create(ctx, &pvc); err != nil && !apierrors.IsAlreadyExists(err) {
d.K8srecorder.Event(dcr, string(EventWarning), PVCCreateFailed, err.Error())
klog.Errorf("SubDefaultController namespace %s name %s create pvc %s failed, %s.", dcr.Namespace, dcr.Name, pvc.Name, err.Error())
}
}
return prepared
}