func()

in pkg/controller/sub_controller/be/controller.go [53:119]


func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error {
	if dcr.Spec.BeSpec == nil {
		return nil
	}

	var oldStatus v1.ComponentStatus
	if dcr.Status.BEStatus != nil {
		oldStatus = *(dcr.Status.BEStatus.DeepCopy())
	}
	be.InitStatus(dcr, v1.Component_BE)
	if !be.FeAvailable(dcr) {
		return nil
	}

	if dcr.Spec.EnableRestartWhenConfigChange {
		be.CompareConfigmapAndTriggerRestart(dcr, oldStatus, v1.Component_BE)
	}

	beSpec := dcr.Spec.BeSpec
	//get the be configMap for resolve ports.
	//2. get config for generate statefulset and service.
	config, err := be.GetConfig(ctx, &beSpec.ConfigMapInfo, dcr.Namespace, v1.Component_BE)
	if err != nil {
		klog.Error("BeController Sync ", "resolve be configmap failed, namespace ", dcr.Namespace, " error :", err)
		return err
	}

	be.CheckConfigMountPath(dcr, v1.Component_BE)
	be.CheckSecretMountPath(dcr, v1.Component_BE)
	be.CheckSecretExist(ctx, dcr, v1.Component_BE)
	//generate new be service.
	svc := resource.BuildExternalService(dcr, v1.Component_BE, config)
	//create or update be external and domain search service, update the status of fe on src.
	internalService := resource.BuildInternalService(dcr, v1.Component_BE, config)
	if err := k8s.ApplyService(ctx, be.K8sclient, &internalService, resource.ServiceDeepEqual); err != nil {
		klog.Errorf("be controller sync apply internalService name=%s, namespace=%s, clusterName=%s failed.message=%s.",
			internalService.Name, internalService.Namespace, dcr.Name, err.Error())
		return err
	}
	if err := k8s.ApplyService(ctx, be.K8sclient, &svc, resource.ServiceDeepEqual); err != nil {
		klog.Errorf("be controller sync apply external service name=%s, namespace=%s, clusterName=%s failed. message=%s.",
			svc.Name, svc.Namespace, dcr.Name, err.Error())
		return err
	}

	if err = be.prepareStatefulsetApply(dcr, oldStatus); err != nil {
		return err
	}

	st := be.buildBEStatefulSet(dcr, config)
	if !be.PrepareReconcileResources(ctx, dcr, v1.Component_BE) {
		klog.Infof("be controller sync preparing resource for reconciling namespace %s name %s!", dcr.Namespace, dcr.Name)
		return nil
	}

	if err = k8s.ApplyStatefulSet(ctx, be.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool {
		// if have restart annotation, we should exclude the interference for comparison.
		be.RestrictConditionsEqual(new, est)
		return resource.StatefulSetDeepEqual(new, est, false)
	}); err != nil {
		klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.",
			st.Name, st.Namespace, dcr.Name, err.Error())
		return err
	}

	return nil
}