pkg/controller/sub_controller/fe/prepare_modify.go (118 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package fe import ( "context" v1 "github.com/apache/doris-operator/api/doris/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/mysql" "github.com/apache/doris-operator/pkg/common/utils/resource" sc "github.com/apache/doris-operator/pkg/controller/sub_controller" appv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "strconv" "strings" ) // prepareStatefulsetApply means Pre-operation and status control on the client side func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster, oldStatus v1.ComponentStatus) error { var oldSt appv1.StatefulSet err := fc.K8sclient.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: v1.GenerateComponentStatefulSetName(cluster, v1.Component_FE)}, &oldSt) if err != nil { klog.Infof("fe controller controlClusterPhaseAndPreOperation get fe StatefulSet failed, err: %s", err.Error()) return nil } if cluster.Spec.FeSpec.Replicas == nil { cluster.Spec.FeSpec.Replicas = resource.GetInt32Pointer(0) } fc.safeScaleDown(cluster, &oldSt) // wroa means: oldReplicas - newReplicas, the opposite of removedAmount, willRemovedOppositeAmount shortly as wroa wroa := *(cluster.Spec.FeSpec.Replicas) - *(oldSt.Spec.Replicas) // fe scale if wroa < 0 { if err := fc.dropObserverBySqlClient(ctx, fc.K8sclient, cluster); err != nil { klog.Errorf("ScaleDownObserver failed, err:%s ", err.Error()) return err } return nil } // fe rolling restart // check 1: fe Phase is Available // check 2: fe RestartTime is not empty and useful // check 3: fe RestartTime different from old(This condition does not need to be checked here. If it is allowed to pass, it will be processed idempotent when applying sts.) if oldStatus.ComponentCondition.Phase == v1.Available && fc.CheckRestartTimeAndInject(cluster, v1.Component_FE) { cluster.Status.FEStatus.ComponentCondition.Phase = v1.Restarting } //TODO check upgrade return nil } func (fc *Controller) safeScaleDown(cluster *v1.DorisCluster, ost *appv1.StatefulSet) { ele := cluster.GetElectionNumber() nr := *cluster.Spec.FeSpec.Replicas or := *ost.Spec.Replicas //if not scale down do nothing. if nr >= or { return } //if scale down observers,(replicas > election number), be allowed. if nr >= ele { return } if or >= ele { // if the scale down nodes have observer and follower roles, scale down observers. *cluster.Spec.FeSpec.Replicas = ele fc.K8srecorder.Event(cluster,string(sc.EventWarning), sc.FollowerScaleDownFailed,"Replicas is not allowed less than ElectionNumber, because of the bdbje (like raft) consistency protocol, if want do that please set ElectionNumber less than replicas. like that \"spec:{feSpec:{electionNumber}}\"") } else { //if the scale down nodes only have followers, not be allowed. *cluster.Spec.FeSpec.Replicas =or fc.K8srecorder.Event(cluster,string(sc.EventWarning), sc.FollowerScaleDownFailed,"Replicas less than electionNumber, so not allowed scale down. This is because the bdbje(like raft) consistency protocol, if want do that please set ElectionNumber less than replicas. like that \"spec:{feSpec:{electionNumber}}\"") } return } // dropObserverBySqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer // targetDCR is new dcr func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error { // get adminuserName and pwd secret, _ := k8s.GetSecret(ctx, k8sclient, targetDCR.Namespace, targetDCR.Spec.AuthSecret) adminUserName, password := v1.GetClusterSecret(targetDCR, secret) // get host and port serviceName := v1.GenerateExternalServiceName(targetDCR, v1.Component_FE) // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace host := serviceName + "." + targetDCR.Namespace maps, _ := k8s.GetConfig(ctx, k8sclient, &targetDCR.Spec.FeSpec.ConfigMapInfo, targetDCR.Namespace, v1.Component_FE) queryPort := resource.GetPort(maps, resource.QUERY_PORT) // connect to doris sql to get master node // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. dbConf := mysql.DBConfig{ User: adminUserName, Password: password, Host: host, Port: strconv.FormatInt(int64(queryPort), 10), Database: "mysql", } masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) if err != nil { klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) return err } defer masterDBClient.Close() // get all Observes allObserves, err := masterDBClient.GetObservers() if err != nil { klog.Errorf("DropObserverFromSqlClient failed, GetObservers err:%s", err.Error()) return err } // make sure needRemovedAmount, this may involve retrying tasks and scaling down followers. electionNumber := targetDCR.GetElectionNumber() // means: needRemovedAmount = allobservers - (replicas - election) needRemovedAmount := int32(len(allObserves)) - *(targetDCR.Spec.FeSpec.Replicas) + electionNumber if needRemovedAmount <= 0 { klog.Errorf("DropObserverFromSqlClient failed, Observers number(%d) is not larger than scale number(%d) ", len(allObserves), *(targetDCR.Spec.FeSpec.Replicas)-electionNumber) return nil } // get scale Observes var frontendMap map[int]*mysql.Frontend // frontendMap key is fe pod index ,value is frontend podTemplateName := resource.GeneratePodTemplateName(targetDCR, v1.Component_FE) if resource.GetStartMode(maps) == resource.START_MODEL_FQDN { // use host frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, nil, podTemplateName) if err != nil { klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error()) return nil } } else { // use ip podMap := make(map[string]string) // key is pod ip, value is pod name pods, err := k8s.GetPods(ctx, k8sclient, targetDCR.Namespace, v1.GetPodLabels(targetDCR, v1.Component_FE)) if err != nil { klog.Errorf("DropObserverFromSqlClient failed, GetPods err:%s", err) return nil } for _, item := range pods.Items { if strings.HasPrefix(item.GetName(), podTemplateName) { podMap[item.Status.PodIP] = item.GetName() } } frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, podMap, podTemplateName) if err != nil { klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error()) return nil } } observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount) // drop node and return return masterDBClient.DropObserver(observes) }