in pkg/controller/sub_controller/fe/prepare_modify.go [101:177]
func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error {
// get adminuserName and pwd
secret, _ := k8s.GetSecret(ctx, k8sclient, targetDCR.Namespace, targetDCR.Spec.AuthSecret)
adminUserName, password := v1.GetClusterSecret(targetDCR, secret)
// get host and port
serviceName := v1.GenerateExternalServiceName(targetDCR, v1.Component_FE)
// When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace
host := serviceName + "." + targetDCR.Namespace
maps, _ := k8s.GetConfig(ctx, k8sclient, &targetDCR.Spec.FeSpec.ConfigMapInfo, targetDCR.Namespace, v1.Component_FE)
queryPort := resource.GetPort(maps, resource.QUERY_PORT)
// connect to doris sql to get master node
// It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail.
dbConf := mysql.DBConfig{
User: adminUserName,
Password: password,
Host: host,
Port: strconv.FormatInt(int64(queryPort), 10),
Database: "mysql",
}
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return err
}
defer masterDBClient.Close()
// get all Observes
allObserves, err := masterDBClient.GetObservers()
if err != nil {
klog.Errorf("DropObserverFromSqlClient failed, GetObservers err:%s", err.Error())
return err
}
// make sure needRemovedAmount, this may involve retrying tasks and scaling down followers.
electionNumber := targetDCR.GetElectionNumber()
// means: needRemovedAmount = allobservers - (replicas - election)
needRemovedAmount := int32(len(allObserves)) - *(targetDCR.Spec.FeSpec.Replicas) + electionNumber
if needRemovedAmount <= 0 {
klog.Errorf("DropObserverFromSqlClient failed, Observers number(%d) is not larger than scale number(%d) ", len(allObserves), *(targetDCR.Spec.FeSpec.Replicas)-electionNumber)
return nil
}
// get scale Observes
var frontendMap map[int]*mysql.Frontend // frontendMap key is fe pod index ,value is frontend
podTemplateName := resource.GeneratePodTemplateName(targetDCR, v1.Component_FE)
if resource.GetStartMode(maps) == resource.START_MODEL_FQDN { // use host
frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, nil, podTemplateName)
if err != nil {
klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error())
return nil
}
} else { // use ip
podMap := make(map[string]string) // key is pod ip, value is pod name
pods, err := k8s.GetPods(ctx, k8sclient, targetDCR.Namespace, v1.GetPodLabels(targetDCR, v1.Component_FE))
if err != nil {
klog.Errorf("DropObserverFromSqlClient failed, GetPods err:%s", err)
return nil
}
for _, item := range pods.Items {
if strings.HasPrefix(item.GetName(), podTemplateName) {
podMap[item.Status.PodIP] = item.GetName()
}
}
frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, podMap, podTemplateName)
if err != nil {
klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error())
return nil
}
}
observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount)
// drop node and return
return masterDBClient.DropObserver(observes)
}