func()

in pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go [308:379]


func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8sclient client.Client, cluster *v1.DorisDisaggregatedCluster) error {
	// get adminuserName and pwd
	adminUserName, password := dfc.GetManagementAdminUserAndPWD(ctx, cluster)

	// get host and port
	// When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace
	host := cluster.GetFEVIPAddresss()
	confMap := dfc.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps)
	queryPort := resource.GetPort(confMap, resource.QUERY_PORT)

	// connect to doris sql to get master node
	// It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail.
	dbConf := mysql.DBConfig{
		User:     adminUserName,
		Password: password,
		Host:     host,
		Port:     strconv.FormatInt(int64(queryPort), 10),
		Database: "mysql",
	}
	masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
	if err != nil {
		klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
		return err
	}
	defer masterDBClient.Close()

	allObserves, err := masterDBClient.GetObservers()
	if err != nil {
		klog.Errorf("dropFEFromSQLClient failed, GetObservers err:%s", err.Error())
		return err
	}

	// means: needRemovedAmount = allobservers - (replicas - election)
	electionNumber := cluster.GetElectionNumber()
	needRemovedAmount := int32(len(allObserves)) - *(cluster.Spec.FeSpec.Replicas) + electionNumber
	if needRemovedAmount <= 0 {
		klog.Errorf("dropFEFromSQLClient failed, Observers number(%d) is not larger than scale number(%d) ", len(allObserves), *(cluster.Spec.FeSpec.Replicas)-electionNumber)
		return nil
	}

	// get will delete Observes
	var frontendMap map[int]*mysql.Frontend // frontendMap key is fe pod index ,value is frontend
	stsName := cluster.GetFEStatefulsetName()

	if resource.GetStartMode(confMap) == resource.START_MODEL_FQDN { // use host
		frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, nil, stsName)
		if err != nil {
			klog.Errorf("dropFEFromSQLClient failed, buildSeqNumberToFrontend err:%s", err.Error())
			return nil
		}
	} else { // use ip
		podMap := make(map[string]string) // key is pod ip, value is pod name
		pods, err := k8s.GetPods(ctx, k8sclient, cluster.Namespace, dfc.getFEPodLabels(cluster))
		if err != nil {
			klog.Errorf("dropFEFromSQLClient failed, GetPods err:%s", err)
			return nil
		}
		for _, item := range pods.Items {
			if strings.HasPrefix(item.GetName(), stsName) {
				podMap[item.Status.PodIP] = item.GetName()
			}
		}
		frontendMap, err = mysql.BuildSeqNumberToFrontendMap(allObserves, podMap, stsName)
		if err != nil {
			klog.Errorf("dropFEFromSQLClient failed, buildSeqNumberToFrontend err:%s", err.Error())
			return nil
		}
	}
	observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount)
	// drop node and return
	return masterDBClient.DropObserver(observes)
}