pkg/controller/sub_controller/cn/controller.go (213 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cn
import (
"context"
dorisv1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/resource"
"github.com/apache/doris-operator/pkg/controller/sub_controller"
appv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)
type Controller struct {
sub_controller.SubDefaultController
}
const (
CN_SEARCH_SUFFIX = "-search"
)
func (cn *Controller) GetControllerName() string {
return "cnController"
}
func New(k8sclient client.Client, k8srecorder record.EventRecorder) *Controller {
return &Controller{
SubDefaultController: sub_controller.SubDefaultController{
K8sclient: k8sclient,
K8srecorder: k8srecorder,
},
}
}
func (cn *Controller) Sync(ctx context.Context, dcr *dorisv1.DorisCluster) error {
if dcr.Spec.CnSpec == nil {
if _, err := cn.ClearResources(ctx, dcr); err != nil {
klog.Errorf("cn controller sync clearResource namespace=%s,srcName=%s, err=%s\n", dcr.Namespace, dcr.Name, err.Error())
return err
}
return nil
}
if !cn.FeAvailable(dcr) {
return nil
}
cnSpec := dcr.Spec.CnSpec
config, err := cn.GetConfig(ctx, &cnSpec.ConfigMapInfo, dcr.Namespace)
if err != nil {
klog.Errorf("cn controller sync resolve cn configMap failed, namespace %s ,err : %s", dcr.Namespace, err.Error())
return err
}
cn.CheckConfigMountPath(dcr, dorisv1.Component_CN)
cn.CheckSecretMountPath(dcr, dorisv1.Component_CN)
cn.CheckSecretExist(ctx, dcr, dorisv1.Component_CN)
svc := resource.BuildExternalService(dcr, dorisv1.Component_CN, config)
internalSVC := resource.BuildInternalService(dcr, dorisv1.Component_CN, config)
if err := k8s.ApplyService(ctx, cn.K8sclient, &internalSVC, resource.ServiceDeepEqual); err != nil {
klog.Errorf("cn controller sync apply internalService name=%s, namespace=%s, clusterName=%s failed.message=%s.",
internalSVC.Name, internalSVC.Namespace, dcr.Name, err.Error())
return err
}
if err := k8s.ApplyService(ctx, cn.K8sclient, &svc, resource.ServiceDeepEqual); err != nil {
klog.Errorf("cn controller sync apply externalService name=%s, namespace=%s, clusterName=%s failed.message=%s.",
svc.Name, svc.Namespace, dcr.Name, err.Error())
return err
}
cnStatefulSet := cn.buildCnStatefulSet(dcr, config)
if !cn.PrepareReconcileResources(ctx, dcr, dorisv1.Component_CN) {
klog.Infof("cn controller sync preparing resource for reconciling namespace %s name %s!", dcr.Namespace, dcr.Name)
return nil
}
if err = cn.applyStatefulSet(ctx, &cnStatefulSet, cnSpec.AutoScalingPolicy != nil); err != nil {
klog.Errorf("cn controller sync statefulset name=%s, namespace=%s,failed. message=%s.",
cnStatefulSet.Name, cnStatefulSet.Namespace, err.Error())
return err
}
//create autoscaler.
if cnSpec.AutoScalingPolicy != nil {
err = cn.deployAutoScaler(ctx, *cnSpec.AutoScalingPolicy, &cnStatefulSet, dcr)
}
return nil
}
func (cn *Controller) UpdateComponentStatus(cluster *dorisv1.DorisCluster) error {
// if spec is not exit, status is empty. but before clear status we must clear all resource about cn.
if cluster.Spec.CnSpec == nil {
cluster.Status.CnStatus = nil
return nil
}
cs := &dorisv1.CnStatus{
ComponentStatus: dorisv1.ComponentStatus{
ComponentCondition: dorisv1.ComponentCondition{
SubResourceName: dorisv1.GenerateComponentStatefulSetName(cluster, dorisv1.Component_CN),
Phase: dorisv1.Reconciling,
LastTransitionTime: metav1.NewTime(time.Now()),
},
},
}
if cluster.Spec.CnSpec.AutoScalingPolicy != nil {
cs.HorizontalScaler = &dorisv1.HorizontalScaler{
Version: cluster.Spec.CnSpec.AutoScalingPolicy.Version,
Name: cn.generateAutoScalerName(cluster),
}
}
cluster.Status.CnStatus = cs
// start autoscaler, the replicas should get from statefulset, statefulset's replicas will update by autoscaler when not set.
var est appv1.StatefulSet
if err := cn.K8sclient.Get(context.Background(), types.NamespacedName{Namespace: cluster.Namespace, Name: dorisv1.GenerateComponentStatefulSetName(cluster, dorisv1.Component_CN)}, &est); err != nil {
cn.K8srecorder.Eventf(cluster, string(sub_controller.EventWarning), sub_controller.StatefulSetNotExist, "the cn statefulset %s not exist.", dorisv1.GenerateComponentStatefulSetName(cluster, dorisv1.Component_CN))
return nil
}
replicas := *est.Spec.Replicas
cs.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_CN)
return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas, dorisv1.Component_CN)
}
// autoscaler represents start autoscaler or not.
func (cn *Controller) applyStatefulSet(ctx context.Context, st *appv1.StatefulSet, autoscaler bool) error {
//create or update the status. create statefulset return, must ensure the
var est appv1.StatefulSet
if err := cn.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
return k8s.CreateClientObject(ctx, cn.K8sclient, st)
} else if err != nil {
klog.Errorf("CnController Sync create statefulset name=%s, namespace=%s error=%s", st.Name, st.Namespace, err.Error())
return err
}
//if the spec is changed, update the status of cn on src.
var excludeReplica bool
//if replicas =0 and not the first time, exclude the hash for autoscaler
if st.Spec.Replicas == nil && !autoscaler {
excludeReplica = true
}
//the statefulset equal should exclude pvc. pvc not allowed update when use statefulset manage, when use `operator` mode for management that pvc not allow updated in statetfulset spec.
cn.RestrictConditionsEqual(st, &est)
if !resource.StatefulSetDeepEqual(st, &est, excludeReplica) {
//if the replicas not zero, represent user have cancel autoscaler.
if st.Spec.Replicas != nil {
resource.MergeStatefulSets(st, est)
return k8s.UpdateClientObject(ctx, cn.K8sclient, st)
}
st.ResourceVersion = est.ResourceVersion
return k8s.UpdateClientObject(ctx, cn.K8sclient, st)
}
return nil
}
func (cn *Controller) deleteAutoScaler(ctx context.Context, dcr *dorisv1.DorisCluster) error {
if dcr.Status.CnStatus == nil {
return nil
}
if dcr.Status.CnStatus.HorizontalScaler.Name == "" {
klog.V(4).Infof("cnController not need delete the autoScaler, namespace=%s, src name=%s.", dcr.Namespace, dcr.Name)
return nil
}
autoScalerName := dcr.Status.CnStatus.HorizontalScaler.Name
version := dcr.Status.CnStatus.HorizontalScaler.Version
if err := k8s.DeleteAutoscaler(ctx, cn.K8sclient, dcr.Namespace, autoScalerName, version); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("cnController sync deploy or delete failed, namespace=%s, autosclaer name=%s, autoscaler version=%s", dcr.GetNamespace(), autoScalerName, version)
return err
}
dcr.Status.CnStatus.HorizontalScaler = nil
return nil
}
func (cn *Controller) deployAutoScaler(ctx context.Context, policy dorisv1.AutoScalingPolicy, target *appv1.StatefulSet, dcr *dorisv1.DorisCluster) error {
params := cn.buildCnAutoscalerParams(policy, target, dcr)
autoScaler := resource.BuildHorizontalPodAutoscaler(params)
if err := k8s.CreateOrUpdateClientObject(ctx, cn.K8sclient, autoScaler); err != nil {
klog.Errorf("cnController deployAutoscaler failed, namespace=%s,name=%s,version=%s,error=%s", autoScaler.GetNamespace(), autoScaler.GetName(), policy.Version, err.Error())
return err
}
return nil
}
func (cn *Controller) ClearResources(ctx context.Context, dcr *dorisv1.DorisCluster) (bool, error) {
cnStatus := dcr.Status.CnStatus
if cnStatus == nil {
klog.Info("Doris cluster is not have cn")
return true, nil
}
// clear autoscaler when autoscaler config deleted or the doriscluster deleted.
if dcr.Spec.CnSpec.AutoScalingPolicy == nil || !dcr.DeletionTimestamp.IsZero() {
if err := cn.DeleteAutoscaler(ctx, dcr); err != nil {
cn.K8srecorder.Eventf(dcr, string(sub_controller.EventWarning), sub_controller.AutoScalerDeleteFailed, "cn autoscaler deleted failed."+err.Error())
}
}
if dcr.Spec.CnSpec == nil {
cn.ClearCommonResources(ctx, dcr, dorisv1.Component_CN)
}
return true, nil
}
func (cn *Controller) DeleteAutoscaler(ctx context.Context, dcr *dorisv1.DorisCluster) error {
if dcr.Status.CnStatus == nil || dcr.Status.CnStatus.HorizontalScaler == nil {
return nil
}
autoScalerName := dcr.Status.CnStatus.HorizontalScaler.Name
version := dcr.Status.CnStatus.HorizontalScaler.Version
if err := k8s.DeleteAutoscaler(ctx, cn.K8sclient, dcr.Namespace, autoScalerName, version); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("cnController delete failed, namespace=%s, autosclaer name=%s, autoscaler version=%s", dcr.GetNamespace(), autoScalerName, version)
return err
}
dcr.Status.CnStatus.HorizontalScaler = &dorisv1.HorizontalScaler{}
return nil
}
func (cn *Controller) GetConfig(ctx context.Context, configMapInfo *dorisv1.ConfigMapInfo, namespace string) (map[string]interface{}, error) {
cms := resource.GetMountConfigMapInfo(*configMapInfo)
if len(cms) == 0 {
return make(map[string]interface{}), nil
}
configMaps, err := k8s.GetConfigMaps(ctx, cn.K8sclient, namespace, cms)
if err != nil {
klog.Errorf("CnController GetConfig get configmap failed, namespace: %s, err: %s \n", namespace, err.Error())
}
res, resolveErr := resource.ResolveConfigMaps(configMaps, dorisv1.Component_CN)
return res, utils.MergeError(err, resolveErr)
}
func (cn *Controller) getFeConfig(ctx context.Context, configMapInfo *dorisv1.ConfigMapInfo, namespace string) (map[string]interface{}, error) {
cms := resource.GetMountConfigMapInfo(*configMapInfo)
if len(cms) == 0 {
return make(map[string]interface{}), nil
}
configMaps, err := k8s.GetConfigMaps(ctx, cn.K8sclient, namespace, cms)
if err != nil {
klog.Errorf("CnController GetFeConfig get configmap failed, namespace: %s, err: %s \n", namespace, err.Error())
}
res, resolveErr := resource.ResolveConfigMaps(configMaps, dorisv1.Component_FE)
return res, utils.MergeError(err, resolveErr)
}