pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go (143 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package metaservice import ( "context" "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/resource" sc "github.com/apache/doris-operator/pkg/controller/sub_controller" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "strconv" ) type DisaggregatedMSController struct { sc.DisaggregatedSubDefaultController } func (dms *DisaggregatedMSController) ClearResources(ctx context.Context, obj client.Object) (bool, error) { ddc := obj.(*v1.DorisDisaggregatedCluster) statefulsetName := ddc.GetMSStatefulsetName() serviceName := ddc.GetMSServiceName() if ddc.DeletionTimestamp.IsZero() { return true, nil } if err := k8s.DeleteService(ctx, dms.K8sclient, ddc.Namespace, serviceName); err != nil { klog.Errorf("dms controller delete service namespace %s name %s failed, err=%s", ddc.Namespace, serviceName, err.Error()) dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.MSServiceDeletedFailed), err.Error()) return false, err } if err := k8s.DeleteStatefulset(ctx, dms.K8sclient, ddc.Namespace, statefulsetName); err != nil { klog.Errorf("dms controller delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, statefulsetName, err.Error()) dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.MSStatefulsetDeleteFailed), err.Error()) return false, err } return true, nil } func (dms *DisaggregatedMSController) GetControllerName() string { return dms.ControllerName } func (dms *DisaggregatedMSController) UpdateComponentStatus(obj client.Object) error { var availableReplicas int32 var creatingReplicas int32 var failedReplicas int32 ddc := obj.(*v1.DorisDisaggregatedCluster) msSpec := ddc.Spec.MetaService confMap := dms.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.MS_RESOLVEKEY, msSpec.ConfigMaps) port := resource.GetPort(confMap, resource.BRPC_LISTEN_PORT) msEndPoint := ddc.GetMSServiceName() + "." + ddc.Namespace + ":" + strconv.Itoa(int(port)) ddc.Status.MetaServiceStatus.MetaServiceEndpoint = msEndPoint token := resource.DefaultMsToken if v, ok := confMap[resource.DefaultMsTokenKey]; ok { token = v.(string) } ddc.Status.MetaServiceStatus.MsToken = token selector := dms.newMSPodsSelector(ddc.Name) var podList corev1.PodList if err := dms.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil { return err } for _, pod := range podList.Items { if ready := k8s.PodIsReady(&pod.Status); ready { availableReplicas++ } else if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending { creatingReplicas++ } else { failedReplicas++ } } if availableReplicas > 0 { ddc.Status.MetaServiceStatus.AvailableStatus = v1.Available ddc.Status.MetaServiceStatus.Phase = v1.Ready } return nil } var _ sc.DisaggregatedSubController = &DisaggregatedMSController{} var ( metaServiceController = "metaServiceController" ) func New(mgr ctrl.Manager) *DisaggregatedMSController { return &DisaggregatedMSController{ sc.DisaggregatedSubDefaultController{ K8sclient: mgr.GetClient(), K8srecorder: mgr.GetEventRecorderFor(metaServiceController), ControllerName: metaServiceController, }} } func (dms *DisaggregatedMSController) Sync(ctx context.Context, obj client.Object) error { ddc := obj.(*v1.DorisDisaggregatedCluster) msSpec := ddc.Spec.MetaService confMap := dms.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.MS_RESOLVEKEY, msSpec.ConfigMaps) svc := dms.newService(ddc, confMap) st := dms.newStatefulset(ddc, confMap) dms.initMSStatus(ddc) dms.CheckSecretMountPath(ddc, ddc.Spec.MetaService.Secrets) dms.CheckSecretExist(ctx, ddc, ddc.Spec.MetaService.Secrets) event, err := dms.DefaultReconcileService(ctx, svc) if err != nil { if event != nil { dms.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } klog.Errorf("dms controller reconcile service namespace %s name %s failed, err=%s", svc.Namespace, svc.Name, err.Error()) return err } event, err = dms.reconcileStatefulset(ctx, st) if err != nil { if event != nil { dms.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } klog.Errorf("dms controller reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) return err } return nil } func (dms *DisaggregatedMSController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet) (*sc.Event, error) { var est appv1.StatefulSet if err := dms.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { if err = k8s.CreateClientObject(ctx, dms.K8sclient, st); err != nil { klog.Errorf("dms controller reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err } return nil, nil } else if err != nil { klog.Errorf("dms controller reconcileStatefulset get statefulset failed, namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return nil, err } if err := k8s.ApplyStatefulSet(ctx, dms.K8sclient, st, func(st, est *appv1.StatefulSet) bool { return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false) }); err != nil { klog.Errorf("dms controller reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err } return nil, nil } func (dms *DisaggregatedMSController) initMSStatus(ddc *v1.DorisDisaggregatedCluster) { initPhase := v1.Reconciling if ddc.Status.MetaServiceStatus.Phase != "" { initPhase = ddc.Status.MetaServiceStatus.Phase } //re initial status to un available ddc.Status.MetaServiceStatus.AvailableStatus = v1.UnAvailable ddc.Status.MetaServiceStatus.Phase = initPhase }