in pkg/controller/sub_controller/be/controller.go [53:119]
func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error {
if dcr.Spec.BeSpec == nil {
return nil
}
var oldStatus v1.ComponentStatus
if dcr.Status.BEStatus != nil {
oldStatus = *(dcr.Status.BEStatus.DeepCopy())
}
be.InitStatus(dcr, v1.Component_BE)
if !be.FeAvailable(dcr) {
return nil
}
if dcr.Spec.EnableRestartWhenConfigChange {
be.CompareConfigmapAndTriggerRestart(dcr, oldStatus, v1.Component_BE)
}
beSpec := dcr.Spec.BeSpec
//get the be configMap for resolve ports.
//2. get config for generate statefulset and service.
config, err := be.GetConfig(ctx, &beSpec.ConfigMapInfo, dcr.Namespace, v1.Component_BE)
if err != nil {
klog.Error("BeController Sync ", "resolve be configmap failed, namespace ", dcr.Namespace, " error :", err)
return err
}
be.CheckConfigMountPath(dcr, v1.Component_BE)
be.CheckSecretMountPath(dcr, v1.Component_BE)
be.CheckSecretExist(ctx, dcr, v1.Component_BE)
//generate new be service.
svc := resource.BuildExternalService(dcr, v1.Component_BE, config)
//create or update be external and domain search service, update the status of fe on src.
internalService := resource.BuildInternalService(dcr, v1.Component_BE, config)
if err := k8s.ApplyService(ctx, be.K8sclient, &internalService, resource.ServiceDeepEqual); err != nil {
klog.Errorf("be controller sync apply internalService name=%s, namespace=%s, clusterName=%s failed.message=%s.",
internalService.Name, internalService.Namespace, dcr.Name, err.Error())
return err
}
if err := k8s.ApplyService(ctx, be.K8sclient, &svc, resource.ServiceDeepEqual); err != nil {
klog.Errorf("be controller sync apply external service name=%s, namespace=%s, clusterName=%s failed. message=%s.",
svc.Name, svc.Namespace, dcr.Name, err.Error())
return err
}
if err = be.prepareStatefulsetApply(dcr, oldStatus); err != nil {
return err
}
st := be.buildBEStatefulSet(dcr, config)
if !be.PrepareReconcileResources(ctx, dcr, v1.Component_BE) {
klog.Infof("be controller sync preparing resource for reconciling namespace %s name %s!", dcr.Namespace, dcr.Name)
return nil
}
if err = k8s.ApplyStatefulSet(ctx, be.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool {
// if have restart annotation, we should exclude the interference for comparison.
be.RestrictConditionsEqual(new, est)
return resource.StatefulSetDeepEqual(new, est, false)
}); err != nil {
klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.",
st.Name, st.Namespace, dcr.Name, err.Error())
return err
}
return nil
}