pkg/controller/sub_controller/broker/controller.go (104 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package broker import ( "context" v1 "github.com/apache/doris-operator/api/doris/v1" "github.com/apache/doris-operator/pkg/common/utils" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/resource" "github.com/apache/doris-operator/pkg/controller/sub_controller" appv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "time" ) type Controller struct { sub_controller.SubDefaultController } const ( BROKER_SEARCH_SUFFIX = "-search" ) func New(k8sclient client.Client, k8srecorder record.EventRecorder) *Controller { return &Controller{ SubDefaultController: sub_controller.SubDefaultController{ K8sclient: k8sclient, K8srecorder: k8srecorder, }, } } func (bk *Controller) GetControllerName() string { return "brokerController" } func (bk *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error { if dcr.Spec.BrokerSpec == nil { return nil } if !bk.FeAvailable(dcr) { return nil } brokerSpec := dcr.Spec.BrokerSpec //get the broker configMap for resolve ports. //2. get config for generate statefulset and service. config, err := bk.GetConfig(ctx, &brokerSpec.ConfigMapInfo, dcr.Namespace, v1.Component_Broker) if err != nil { klog.Error("BrokerController Sync ", "resolve broker configmap failed, namespace ", dcr.Namespace, " error ", err) return err } bk.CheckConfigMountPath(dcr, v1.Component_Broker) bk.CheckSecretMountPath(dcr, v1.Component_Broker) bk.CheckSecretExist(ctx, dcr, v1.Component_Broker) internalService := resource.BuildInternalService(dcr, v1.Component_Broker, config) if err := k8s.ApplyService(ctx, bk.K8sclient, &internalService, resource.ServiceDeepEqual); err != nil { klog.Errorf("broker controller sync apply internalService name=%s, namespace=%s, clusterName=%s failed.message=%s.", internalService.Name, internalService.Namespace, dcr.Name, err.Error()) return err } st := bk.buildBKStatefulSet(dcr, config) if err = k8s.ApplyStatefulSet(ctx, bk.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool { // if have restart annotation, we should exclude the interference for comparison. return resource.StatefulSetDeepEqual(new, est, false) }); err != nil { klog.Errorf("broker controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.", st.Name, st.Namespace, dcr.Name, err.Error()) return err } return nil } func (bk *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { if cluster.Spec.BrokerSpec == nil { cluster.Status.BrokerStatus = nil return nil } bs := &v1.ComponentStatus{ ComponentCondition: v1.ComponentCondition{ SubResourceName: v1.GenerateComponentStatefulSetName(cluster, v1.Component_Broker), Phase: v1.Reconciling, LastTransitionTime: metav1.NewTime(time.Now()), }, } if cluster.Status.BrokerStatus != nil { bs = cluster.Status.BrokerStatus.DeepCopy() } cluster.Status.BrokerStatus = bs bs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_Broker) return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas, v1.Component_Broker) } func (bk *Controller) ClearResources(ctx context.Context, dcr *v1.DorisCluster) (bool, error) { //if the doris is not have broker. if dcr.Status.BrokerStatus == nil { return true, nil } if dcr.Spec.BrokerSpec == nil { return bk.ClearCommonResources(ctx, dcr, v1.Component_Broker) } return true, nil } func (bk *Controller) getFeConfig(ctx context.Context, feconfigMapInfo *v1.ConfigMapInfo, namespace string) (map[string]interface{}, error) { cms := resource.GetMountConfigMapInfo(*feconfigMapInfo) if len(cms) == 0 { return make(map[string]interface{}), nil } feconfigMaps, err := k8s.GetConfigMaps(ctx, bk.K8sclient, namespace, cms) if err != nil { klog.Errorf("BrokerController getFeConfig fe config failed, namespace: %s,err: %s \n", namespace, err.Error()) } res, resolveErr := resource.ResolveConfigMaps(feconfigMaps, v1.Component_FE) return res, utils.MergeError(err, resolveErr) }