in pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go [63:105]
func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj client.Object) error {
ddc := obj.(*dv1.DorisDisaggregatedCluster)
if len(ddc.Spec.ComputeGroups) == 0 {
klog.Errorf("disaggregatedComputeGroupsController sync disaggregatedDorisCluster namespace=%s,name=%s have not compute group spec.", ddc.Namespace, ddc.Name)
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.ComputeGroupsEmpty), "compute group empty, the cluster will not work normal.")
return nil
}
if !dcgs.feAvailable(ddc) {
dcgs.K8srecorder.Event(ddc, string(sc.EventNormal), string(sc.WaitFEAvailable), "fe have not ready.")
return nil
}
// validating compute group information.
if event, res := dcgs.validateComputeGroup(ddc.Spec.ComputeGroups); !res {
klog.Errorf("disaggregatedComputeGroupsController namespace=%s name=%s validateComputeGroup have not match specifications %s.", ddc.Namespace, ddc.Name, sc.EventString(event))
dcgs.K8srecorder.Eventf(ddc, string(event.Type), string(event.Reason), event.Message)
return errors.New("validating compute group failed")
}
var errs []error
cgs := ddc.Spec.ComputeGroups
for i, _ := range cgs {
if event, err := dcgs.computeGroupSync(ctx, ddc, &cgs[i]); err != nil {
if event != nil {
dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message)
}
errs = append(errs, err)
klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event))
}
}
if len(errs) != 0 {
msg := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name)
for _, err := range errs {
msg += err.Error()
}
return errors.New(msg)
}
return nil
}