in pkg/controller/sub_controller/fe/controller.go [81:143]
func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error {
if cluster.Spec.FeSpec == nil {
klog.Info("fe Controller Sync ", "the fe component is not needed ", "namespace ", cluster.Namespace, " doris cluster name ", cluster.Name)
return nil
}
var oldStatus v1.ComponentStatus
if cluster.Status.FEStatus != nil {
oldStatus = *(cluster.Status.FEStatus.DeepCopy())
}
fc.InitStatus(cluster, v1.Component_FE)
if cluster.Spec.EnableRestartWhenConfigChange {
fc.CompareConfigmapAndTriggerRestart(cluster, oldStatus, v1.Component_FE)
}
feSpec := cluster.Spec.FeSpec
//get the fe configMap for resolve ports.
config, err := fc.GetConfig(ctx, &feSpec.BaseSpec.ConfigMapInfo, cluster.Namespace, v1.Component_FE)
if err != nil {
klog.Error("fe Controller Sync ", "resolve fe configmap failed, namespace ", cluster.Namespace, " error :", err)
return err
}
fc.CheckConfigMountPath(cluster, v1.Component_FE)
fc.CheckSecretMountPath(cluster, v1.Component_FE)
fc.CheckSecretExist(ctx, cluster, v1.Component_FE)
fc.CheckSharedPVC(ctx, cluster)
//generate new fe service.
svc := resource.BuildExternalService(cluster, v1.Component_FE, config)
//create or update fe external and domain search service, update the status of fe on src.
internalService := resource.BuildInternalService(cluster, v1.Component_FE, config)
if err := k8s.ApplyService(ctx, fc.K8sclient, &internalService, resource.ServiceDeepEqual); err != nil {
klog.Errorf("fe controller sync apply internalService name=%s, namespace=%s, clusterName=%s failed.message=%s.",
internalService.Name, internalService.Namespace, cluster.Name, err.Error())
return err
}
if err := k8s.ApplyService(ctx, fc.K8sclient, &svc, resource.ServiceDeepEqual); err != nil {
klog.Errorf("fe controller sync apply external service name=%s, namespace=%s, clusterName=%s failed. message=%s.",
svc.Name, svc.Namespace, cluster.Name, err.Error())
return err
}
if !fc.PrepareReconcileResources(ctx, cluster, v1.Component_FE) {
klog.Infof("fe controller sync preparing resource for reconciling namespace %s name %s!", cluster.Namespace, cluster.Name)
return nil
}
if err = fc.prepareStatefulsetApply(ctx, cluster, oldStatus); err != nil {
return err
}
st := fc.buildFEStatefulSet(cluster, config)
if err = k8s.ApplyStatefulSet(ctx, fc.K8sclient, &st, func(new *appv1.StatefulSet, old *appv1.StatefulSet) bool {
fc.RestrictConditionsEqual(new, old)
return resource.StatefulSetDeepEqual(new, old, false)
}); err != nil {
klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.",
st.Name, st.Namespace, cluster.Name, err.Error())
return err
}
return nil
}