func()

in pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go [249:297]


func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *v1.DorisDisaggregatedCluster) (*sc.Event, error) {
	var est appv1.StatefulSet
	if err := dfc.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
		if err = k8s.CreateClientObject(ctx, dfc.K8sclient, st); err != nil {
			klog.Errorf("disaggregatedFEController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
			return &sc.Event{Type: sc.EventWarning, Reason: sc.FECreateResourceFailed, Message: err.Error()}, err
		}

		return nil, nil
	} else if err != nil {
		klog.Errorf("disaggregatedFEController reconcileStatefulset get statefulset failed, namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
		return nil, err
	}

	var replicas int32
	if cluster.Spec.FeSpec.Replicas != nil {
		replicas = *cluster.Spec.FeSpec.Replicas
	}
	electionNumber := cluster.GetElectionNumber()
	if replicas < electionNumber {
		dfc.K8srecorder.Event(cluster, string(sc.EventWarning), string(sc.FESpecSetError), "The number of disaggregated fe ElectionNumber is large than Replicas, Replicas has been corrected to the correct minimum value")
		klog.Errorf("disaggregatedFEController reconcileStatefulset disaggregatedDorisCluster namespace=%s,name=%s ,The number of disaggregated fe ElectionNumber(%d) is large than Replicas(%d)", cluster.Namespace, cluster.Name, electionNumber, *(cluster.Spec.FeSpec.Replicas))
		cluster.Spec.FeSpec.Replicas = &electionNumber
		st.Spec.Replicas = &electionNumber
	}

	// fe scale check and set FEStatus phase
	willRemovedAmount := replicas - *(est.Spec.Replicas)

	//  if fe scale, drop fe node by http
	if willRemovedAmount < 0 || cluster.Status.FEStatus.Phase == v1.ScaleDownFailed {
		if err := dfc.dropFEBySQLClient(ctx, dfc.K8sclient, cluster); err != nil {
			cluster.Status.FEStatus.Phase = v1.ScaleDownFailed
			klog.Errorf("ScaleDownFE failed, err:%s ", err.Error())
			return &sc.Event{Type: sc.EventWarning, Reason: sc.FEHTTPFailed, Message: err.Error()},
				err
		}
		cluster.Status.FEStatus.Phase = v1.Scaling
	}

	// apply fe StatefulSet
	if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
		return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
	}); err != nil {
		klog.Errorf("disaggregatedFEController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
		return &sc.Event{Type: sc.EventWarning, Reason: sc.FEApplyResourceFailed, Message: err.Error()}, err
	}
	return nil, nil
}