pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go (454 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package computegroups
import (
"context"
"errors"
"fmt"
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/resource"
"github.com/apache/doris-operator/pkg/common/utils/set"
sc "github.com/apache/doris-operator/pkg/controller/sub_controller"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"regexp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
"sync"
)
var _ sc.DisaggregatedSubController = &DisaggregatedComputeGroupsController{}
var (
disaggregatedComputeGroupsController = "disaggregatedComputeGroupsController"
)
type DisaggregatedComputeGroupsController struct {
sc.DisaggregatedSubDefaultController
}
func New(mgr ctrl.Manager) *DisaggregatedComputeGroupsController {
return &DisaggregatedComputeGroupsController{
sc.DisaggregatedSubDefaultController{
K8sclient: mgr.GetClient(),
K8srecorder: mgr.GetEventRecorderFor(disaggregatedComputeGroupsController),
ControllerName: disaggregatedComputeGroupsController,
},
}
}
func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj client.Object) error {
ddc := obj.(*dv1.DorisDisaggregatedCluster)
if len(ddc.Spec.ComputeGroups) == 0 {
klog.Errorf("disaggregatedComputeGroupsController sync disaggregatedDorisCluster namespace=%s,name=%s have not compute group spec.", ddc.Namespace, ddc.Name)
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.ComputeGroupsEmpty), "compute group empty, the cluster will not work normal.")
return nil
}
if !dcgs.feAvailable(ddc) {
dcgs.K8srecorder.Event(ddc, string(sc.EventNormal), string(sc.WaitFEAvailable), "fe have not ready.")
return nil
}
// validating compute group information.
if event, res := dcgs.validateComputeGroup(ddc.Spec.ComputeGroups); !res {
klog.Errorf("disaggregatedComputeGroupsController namespace=%s name=%s validateComputeGroup have not match specifications %s.", ddc.Namespace, ddc.Name, sc.EventString(event))
dcgs.K8srecorder.Eventf(ddc, string(event.Type), string(event.Reason), event.Message)
return errors.New("validating compute group failed")
}
var errs []error
cgs := ddc.Spec.ComputeGroups
for i, _ := range cgs {
if event, err := dcgs.computeGroupSync(ctx, ddc, &cgs[i]); err != nil {
if event != nil {
dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message)
}
errs = append(errs, err)
klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event))
}
}
if len(errs) != 0 {
msg := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name)
for _, err := range errs {
msg += err.Error()
}
return errors.New(msg)
}
return nil
}
// validate compute group config information.
func (dcgs *DisaggregatedComputeGroupsController) validateComputeGroup(cgs []dv1.ComputeGroup) (*sc.Event, bool) {
dupl := dcgs.validateDuplicated(cgs)
if dupl != "" {
klog.Errorf("disaggregatedComputeGroupsController validateComputeGroup validate Duplicated have duplicate unique identifier %s.", dupl)
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGUniqueIdentifierDuplicate, Message: "unique identifier " + dupl + " duplicate in compute groups."}, false
}
if reg, res := dcgs.validateRegex(cgs); !res {
klog.Errorf("disaggregatedComputeGroupsController validateComputeGroup validateRegex %s have not match regular expression", reg)
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGUniqueIdentifierNotMatchRegex, Message: reg}, false
}
return nil, true
}
func (dcgs *DisaggregatedComputeGroupsController) feAvailable(ddc *dv1.DorisDisaggregatedCluster) bool {
//if fe deploy in k8s, should wait fe available
//1. wait for fe ok.
endpoints := corev1.Endpoints{}
if err := dcgs.K8sclient.Get(context.Background(), types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.GetFEServiceName()}, &endpoints); err != nil {
klog.Infof("disaggregatedComputeGroupsController Sync wait fe service name %s available occur failed %s\n", ddc.GetFEServiceName(), err.Error())
return false
}
for _, sub := range endpoints.Subsets {
if len(sub.Addresses) > 0 {
return true
}
}
return false
}
func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
if cg.Replicas == nil {
cg.Replicas = resource.GetInt32Pointer(1)
}
cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps)
st := dcgs.NewStatefulset(ddc, cg, cvs)
svc := dcgs.newService(ddc, cg, cvs)
dcgs.initialCGStatus(ddc, cg)
dcgs.CheckSecretMountPath(ddc, cg.Secrets)
dcgs.CheckSecretExist(ctx, ddc, cg.Secrets)
event, err := dcgs.DefaultReconcileService(ctx, svc)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcile service namespace %s name %s failed, err=%s", svc.Namespace, svc.Name, err.Error())
return event, err
}
event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error())
}
return event, err
}
// reconcileStatefulset return bool means reconcile print error message.
func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
var est appv1.StatefulSet
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
}
return nil, nil
} else if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset get statefulset failed, namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return nil, err
}
err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err
}
if skipApplyStatefulset(cluster, cg) {
return nil, nil
}
if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
}); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
}
return nil, nil
}
// initial compute group status before sync resources. status changing with sync steps, and generate the last status by classify pods.
func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) {
cgss := ddc.Status.ComputeGroupStatuses
//clusterId := ddc.GetCGId(cg)
uniqueId := cg.UniqueId
defaultStatus := dv1.ComputeGroupStatus{
Phase: dv1.Reconciling,
UniqueId: cg.UniqueId,
StatefulsetName: ddc.GetCGStatefulsetName(cg),
ServiceName: ddc.GetCGServiceName(cg),
//set for status updated.
Replicas: *cg.Replicas,
}
for i := range cgss {
if cgss[i].UniqueId == uniqueId {
if cgss[i].Phase != dv1.Ready {
defaultStatus.Phase = cgss[i].Phase
}
defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas
cgss[i] = defaultStatus
return
}
}
// Need to adjust by pointer
(&ddc.Status).ComputeGroupStatuses = append((&ddc.Status).ComputeGroupStatuses, defaultStatus)
}
// check compute groups unique identifier duplicated or not. return duplicated key.
func (dcgs *DisaggregatedComputeGroupsController) validateDuplicated(cgs []dv1.ComputeGroup) string {
dupl := ""
uniqueIds := set.NewSetString()
for _, cg := range cgs {
if uniqueIds.Find(cg.UniqueId) {
dupl = dupl + cg.UniqueId + ";"
}
uniqueIds.Add(cg.UniqueId)
}
return dupl
}
// checking the cg name compliant with regular expression or not.
func (dcgs *DisaggregatedComputeGroupsController) validateRegex(cgs []dv1.ComputeGroup) (string, bool) {
var regStr = ""
for _, cg := range cgs {
res, err := regexp.Match(compute_group_name_regex, []byte(cg.UniqueId))
if !res {
regStr = regStr + cg.UniqueId + " not match " + compute_group_name_regex
}
//for debugging, output the error in log
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController validateRegex compute group name %s failed, err=%s", cg.UniqueId, err.Error())
}
}
if regStr != "" {
return regStr, false
}
return "", true
}
// clear not configed cg resources, delete not configed cg status from ddc.status .
func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Context, obj client.Object) (bool, error) {
ddc := obj.(*dv1.DorisDisaggregatedCluster)
var eCGs []dv1.ComputeGroupStatus
for i, cgs := range ddc.Status.ComputeGroupStatuses {
for _, cg := range ddc.Spec.ComputeGroups {
if cgs.UniqueId == cg.UniqueId {
eCGs = append(eCGs, ddc.Status.ComputeGroupStatuses[i])
break
}
}
}
//list the svcs and stss owner reference to dorisDisaggregatedCluster.
cls := dcgs.GetCG2LayerCommonSchedulerLabels(ddc.Name)
svcs, err := k8s.ListServicesInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ListServicesInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
stss, err := k8s.ListStatefulsetInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ListStatefulsetInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
//clear unused service and statefulset.
delSvcNames := dcgs.findUnusedSvcs(svcs, ddc)
delStsNames, delUniqueIds := dcgs.findUnusedStssAndUniqueIds(stss, ddc)
if err = dcgs.clearCGInDorisMeta(ctx, delUniqueIds, ddc); err != nil {
return false, err
}
if err = dcgs.clearSvcs(ctx, delSvcNames, ddc); err != nil {
return false, err
}
if err = dcgs.clearStatefulsets(ctx, delStsNames, ddc); err != nil {
return false, err
}
//clear unused pvc
for i := range eCGs {
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear ComputeGroup reduced replicas PVC failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, eCGs[i].UniqueId, err.Error())
}
}
for _, uniqueId := range delUniqueIds {
//new fake computeGroup status for clear all pvcs owner reference to deleted compute group.
fakeCgs := dv1.ComputeGroupStatus{
UniqueId: uniqueId,
}
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, fakeCgs)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear deleted compute group failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, uniqueId, err.Error())
}
}
ddc.Status.ComputeGroupStatuses = eCGs
return true, nil
}
func (dcgs *DisaggregatedComputeGroupsController) clearStatefulsets(ctx context.Context, stsNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range stsNames {
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear statefulset failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
return nil
}
func (dcgs *DisaggregatedComputeGroupsController) clearSvcs(ctx context.Context, svcNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range svcNames {
if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear service failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
return nil
}
func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
if len(cgNames) == 0 {
return nil
}
sqlClient, err := dcgs.getMasterSqlClient(ctx, ddc)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
defer sqlClient.Close()
for _, name := range cgNames {
//clear cg, the keepAmount = 0
//confirm used the right cgName, as the cgName get from the uniqueid that '-' replaced by '_'.
cgName := strings.ReplaceAll(name, "-", "_")
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, 0)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
}
return nil
}
func (dcgs *DisaggregatedComputeGroupsController) findUnusedSvcs(svcs []corev1.Service, ddc *dv1.DorisDisaggregatedCluster) []string {
var unusedSvcNames []string
for i, _ := range svcs {
own := ownerReference2ddc(&svcs[i], ddc)
if !own {
//not owner reference to ddc, should skip the service.
continue
}
svcUniqueId := getUniqueIdFromClientObject(&svcs[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == svcUniqueId {
exist = true
break
}
}
if !exist {
unusedSvcNames = append(unusedSvcNames, svcs[i].Name)
}
}
return unusedSvcNames
}
func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*cgNames*/) {
var unusedStsNames []string
var unusedUniqueIds []string
for i, _ := range stss {
own := ownerReference2ddc(&stss[i], ddc)
if !own {
//not owner reference tto ddc should skip the statefulset.
continue
}
stsUniqueId := getUniqueIdFromClientObject(&stss[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == stsUniqueId {
exist = true
break
}
}
if !exist {
unusedStsNames = append(unusedStsNames, stss[i].Name)
unusedUniqueIds = append(unusedUniqueIds, stsUniqueId)
}
}
return unusedStsNames, unusedUniqueIds
}
// ClearStatefulsetUnusedPVCs
// 1.delete unused pvc skip cluster is Suspend
// 2.delete unused pvc for statefulset
// 3.delete pvc if not used by any statefulset
func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, cgs dv1.ComputeGroupStatus) error {
var cg *dv1.ComputeGroup
for i := range ddc.Spec.ComputeGroups {
/* uniqueId := ddc.GetCGId(&ddc.Spec.ComputeGroups[i])
if clusterId == cgs.ClusterId {
cg = &ddc.Spec.ComputeGroups[i]
}*/
if ddc.Spec.ComputeGroups[i].UniqueId == cgs.UniqueId {
cg = &ddc.Spec.ComputeGroups[i]
}
}
currentPVCs := corev1.PersistentVolumeClaimList{}
//pvcMap := make(map[string]*corev1.PersistentVolumeClaim)
pvcLabels := dcgs.newCGPodsSelector(ddc.Name, cgs.UniqueId)
if err := dcgs.K8sclient.List(ctx, ¤tPVCs, client.InNamespace(ddc.Namespace), client.MatchingLabels(pvcLabels)); err != nil {
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCListFailed, fmt.Sprintf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs list pvc failed:%s!", err.Error()))
return err
}
// now only clear scale down pod.
if cg == nil {
return nil
}
var clearPVC []string
//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
stsName := ddc.GetCGStatefulsetName(cg)
sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
//waiting next reconciling.
return nil
}
replicas := *sts.Spec.Replicas
for _, pvc := range currentPVCs.Items {
pvcName := pvc.Name
sl := strings.Split(pvcName, stsName+"-")
if len(sl) != 2 {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s not format pvc name format.", ddc.Namespace, pvcName)
continue
}
var index int64
var perr error
index, perr = strconv.ParseInt(sl[1], 10, 32)
if perr != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs namespace %s name %s index parse failed, err=%s", ddc.Namespace, pvcName, err.Error())
continue
}
if int32(index) >= replicas {
clearPVC = append(clearPVC, pvcName)
}
}
var mergeError error
for _, pvcName := range clearPVC {
if err = k8s.DeletePVC(ctx, dcgs.K8sclient, ddc.Namespace, pvcName, pvcLabels); err != nil {
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), sc.PVCDeleteFailed, err.Error())
klog.Errorf("ClearStatefulsetUnusedPVCs deletePVCs failed: namespace %s, name %s delete pvc %s, err: %s .", ddc.Namespace, pvcName, pvcName, err.Error())
mergeError = utils.MergeError(mergeError, err)
}
}
return mergeError
}
func (dcgs *DisaggregatedComputeGroupsController) GetControllerName() string {
return dcgs.ControllerName
}
func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj client.Object) error {
ddc := obj.(*dv1.DorisDisaggregatedCluster)
cgss := ddc.Status.ComputeGroupStatuses
if len(cgss) == 0 {
klog.Errorf("disaggregatedComputeGroupsController updateComponentStatus compute group status is empty!")
return nil
}
errChan := make(chan error, len(cgss))
wg := sync.WaitGroup{}
wg.Add(len(cgss))
for i, _ := range cgss {
go func(idx int) {
defer wg.Done()
errChan <- dcgs.updateCGStatus(ddc, &cgss[idx])
}(i)
}
wg.Wait()
close(errChan)
errMs := ""
for err := range errChan {
if err != nil {
errMs += err.Error()
}
}
var fullAvailableCount int32
var availableCount int32
for _, cgs := range ddc.Status.ComputeGroupStatuses {
if cgs.Phase == dv1.Ready {
fullAvailableCount++
}
if cgs.AvailableReplicas > 0 {
availableCount++
}
}
ddc.Status.ClusterHealth.CGCount = int32(len(ddc.Status.ComputeGroupStatuses))
ddc.Status.ClusterHealth.CGFullAvailableCount = fullAvailableCount
ddc.Status.ClusterHealth.CGAvailableCount = availableCount
if errMs == "" {
return nil
}
return errors.New(errMs)
}
func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisDisaggregatedCluster, cgs *dv1.ComputeGroupStatus) error {
selector := dcgs.newCGPodsSelector(ddc.Name, cgs.UniqueId)
var podList corev1.PodList
if err := dcgs.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil {
return err
}
var availableReplicas int32
var creatingReplicas int32
var failedReplicas int32
//get all pod status that controlled by st.
for _, pod := range podList.Items {
if ready := k8s.PodIsReady(&pod.Status); ready {
availableReplicas++
} else if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending {
creatingReplicas++
} else {
failedReplicas++
}
}
cgs.AvailableReplicas = availableReplicas
if availableReplicas == cgs.Replicas {
cgs.Phase = dv1.Ready
}
return nil
}