in pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go [308:379]
func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8sclient client.Client, cluster *v1.DorisDisaggregatedCluster) error {
// get adminuserName and pwd
adminUserName, password := dfc.GetManagementAdminUserAndPWD(ctx, cluster)
// get host and port
// When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace
host := cluster.GetFEVIPAddresss()
confMap := dfc.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps)
queryPort := resource.GetPort(confMap, resource.QUERY_PORT)
// connect to doris sql to get master node
// It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail.
dbConf := mysql.DBConfig{
User: adminUserName,
Password: password,
Host: host,
Port: strconv.FormatInt(int64(queryPort), 10),
Database: "mysql",
}
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return err
}
defer masterDBClient.Close()
allObserves, err := masterDBClient.GetObservers()
if err != nil {
klog.Errorf("dropFEFromSQLClient failed, GetObservers err:%s", err.Error())
return err
}
// means: needRemovedAmount = allobservers - (replicas - election)
electionNumber := cluster.GetElectionNumber()
needRemovedAmount := int32(len(allObserves)) - *(cluster.Spec.FeSpec.Replicas) + electionNumber
if needRemovedAmount <= 0 {
klog.Errorf("dropFEFromSQLClient failed, Observers number(%d) is not larger than scale number(%d) ", len(allObserves), *(cluster.Spec.FeSpec.Replicas)-electionNumber)
return nil
}
// get will delete Observes
var frontendMap map[int]*mysql.Frontend // frontendMap key is fe pod index ,value is frontend
stsName := cluster.GetFEStatefulsetName()
if resource.GetStartMode(confMap) == resource.START_MODEL_FQDN { // use host
frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, nil, stsName)
if err != nil {
klog.Errorf("dropFEFromSQLClient failed, buildSeqNumberToFrontend err:%s", err.Error())
return nil
}
} else { // use ip
podMap := make(map[string]string) // key is pod ip, value is pod name
pods, err := k8s.GetPods(ctx, k8sclient, cluster.Namespace, dfc.getFEPodLabels(cluster))
if err != nil {
klog.Errorf("dropFEFromSQLClient failed, GetPods err:%s", err)
return nil
}
for _, item := range pods.Items {
if strings.HasPrefix(item.GetName(), stsName) {
podMap[item.Status.PodIP] = item.GetName()
}
}
frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, podMap, stsName)
if err != nil {
klog.Errorf("dropFEFromSQLClient failed, buildSeqNumberToFrontend err:%s", err.Error())
return nil
}
}
observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount)
// drop node and return
return masterDBClient.DropObserver(observes)
}