in pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go [249:297]
func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *v1.DorisDisaggregatedCluster) (*sc.Event, error) {
var est appv1.StatefulSet
if err := dfc.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
if err = k8s.CreateClientObject(ctx, dfc.K8sclient, st); err != nil {
klog.Errorf("disaggregatedFEController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.FECreateResourceFailed, Message: err.Error()}, err
}
return nil, nil
} else if err != nil {
klog.Errorf("disaggregatedFEController reconcileStatefulset get statefulset failed, namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return nil, err
}
var replicas int32
if cluster.Spec.FeSpec.Replicas != nil {
replicas = *cluster.Spec.FeSpec.Replicas
}
electionNumber := cluster.GetElectionNumber()
if replicas < electionNumber {
dfc.K8srecorder.Event(cluster, string(sc.EventWarning), string(sc.FESpecSetError), "The number of disaggregated fe ElectionNumber is large than Replicas, Replicas has been corrected to the correct minimum value")
klog.Errorf("disaggregatedFEController reconcileStatefulset disaggregatedDorisCluster namespace=%s,name=%s ,The number of disaggregated fe ElectionNumber(%d) is large than Replicas(%d)", cluster.Namespace, cluster.Name, electionNumber, *(cluster.Spec.FeSpec.Replicas))
cluster.Spec.FeSpec.Replicas = &electionNumber
st.Spec.Replicas = &electionNumber
}
// fe scale check and set FEStatus phase
willRemovedAmount := replicas - *(est.Spec.Replicas)
// if fe scale, drop fe node by http
if willRemovedAmount < 0 || cluster.Status.FEStatus.Phase == v1.ScaleDownFailed {
if err := dfc.dropFEBySQLClient(ctx, dfc.K8sclient, cluster); err != nil {
cluster.Status.FEStatus.Phase = v1.ScaleDownFailed
klog.Errorf("ScaleDownFE failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.FEHTTPFailed, Message: err.Error()},
err
}
cluster.Status.FEStatus.Phase = v1.Scaling
}
// apply fe StatefulSet
if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
}); err != nil {
klog.Errorf("disaggregatedFEController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.FEApplyResourceFailed, Message: err.Error()}, err
}
return nil, nil
}