func()

in pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go [63:105]


func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj client.Object) error {
	ddc := obj.(*dv1.DorisDisaggregatedCluster)
	if len(ddc.Spec.ComputeGroups) == 0 {
		klog.Errorf("disaggregatedComputeGroupsController sync disaggregatedDorisCluster namespace=%s,name=%s have not compute group spec.", ddc.Namespace, ddc.Name)
		dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.ComputeGroupsEmpty), "compute group empty, the cluster will not work normal.")
		return nil
	}

	if !dcgs.feAvailable(ddc) {
		dcgs.K8srecorder.Event(ddc, string(sc.EventNormal), string(sc.WaitFEAvailable), "fe have not ready.")
		return nil
	}

	// validating compute group information.
	if event, res := dcgs.validateComputeGroup(ddc.Spec.ComputeGroups); !res {
		klog.Errorf("disaggregatedComputeGroupsController namespace=%s name=%s validateComputeGroup have not match specifications %s.", ddc.Namespace, ddc.Name, sc.EventString(event))
		dcgs.K8srecorder.Eventf(ddc, string(event.Type), string(event.Reason), event.Message)
		return errors.New("validating compute group failed")
	}

	var errs []error
	cgs := ddc.Spec.ComputeGroups
	for i, _ := range cgs {

		if event, err := dcgs.computeGroupSync(ctx, ddc, &cgs[i]); err != nil {
			if event != nil {
				dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message)
			}
			errs = append(errs, err)
			klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s  sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event))
		}
	}

	if len(errs) != 0 {
		msg := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name)
		for _, err := range errs {
			msg += err.Error()
		}
		return errors.New(msg)
	}

	return nil
}