pkg/controller/disaggregated_cluster_controller.go (240 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controller
import (
"context"
"errors"
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/hash"
sc "github.com/apache/doris-operator/pkg/controller/sub_controller"
dcgs "github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/computegroups"
dfe "github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe"
"github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/metaservice"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"os"
ctrl "sigs.k8s.io/controller-runtime"
controller_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"
)
var (
_ reconcile.Reconciler = &DisaggregatedClusterReconciler{}
_ Controller = &DisaggregatedClusterReconciler{}
)
var (
disaggregatedClusterController = "disaggregatedClusterController"
)
type DisaggregatedClusterReconciler struct {
client.Client
Recorder record.EventRecorder
Scheme *runtime.Scheme
Scs map[string]sc.DisaggregatedSubController
//record configmap response instance. key: configMap namespacedName, value: DorisDisaggregatedCluster namespacedName
//wcms map[string]string
}
func (dc *DisaggregatedClusterReconciler) Init(mgr ctrl.Manager, options *Options) {
//wcms := make(map[string]string)
scs := make(map[string]sc.DisaggregatedSubController)
msc := metaservice.New(mgr)
scs[msc.GetControllerName()] = msc
dfec := dfe.New(mgr)
scs[dfec.GetControllerName()] = dfec
dccsc := dcgs.New(mgr)
scs[dccsc.GetControllerName()] = dccsc
if err := (&DisaggregatedClusterReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(disaggregatedClusterController),
Scs: scs,
//wcms: wcms,
}).SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller ", "disaggregatedClusterReconciler")
os.Exit(1)
}
if options.EnableWebHook {
if err := (&dv1.DorisDisaggregatedCluster{}).SetupWebhookWithManager(mgr); err != nil {
klog.Error(err, " unable to create unnamedwatches ", " controller ", " DorisDisaggregatedCluster ")
os.Exit(1)
}
}
}
func (dc *DisaggregatedClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := dc.resourceBuilder(ctrl.NewControllerManagedBy(mgr))
builder = dc.watchPodBuilder(builder)
//builder = dc.watchConfigMapBuilder(builder)
return builder.Complete(dc)
}
func (dc *DisaggregatedClusterReconciler) watchPodBuilder(builder *ctrl.Builder) *ctrl.Builder {
mapFn := handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, a client.Object) []reconcile.Request {
labels := a.GetLabels()
dorisName := labels[dv1.DorisDisaggregatedClusterName]
if dorisName != "" {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: dorisName,
Namespace: a.GetNamespace(),
}},
}
}
return nil
})
p := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if _, ok := e.Object.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok {
return false
}
return true
},
UpdateFunc: func(u event.UpdateEvent) bool {
if _, ok := u.ObjectOld.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok {
return false
}
return u.ObjectOld != u.ObjectNew
},
DeleteFunc: func(d event.DeleteEvent) bool {
if _, ok := d.Object.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok {
return false
}
return true
},
}
return builder.Watches(&corev1.Pod{},
mapFn, controller_builder.WithPredicates(p))
}
//func (dc *DisaggregatedClusterReconciler) watchConfigMapBuilder(builder *ctrl.Builder) *ctrl.Builder {
// mapFn := handler.EnqueueRequestsFromMapFunc(
// func(a client.Object) []reconcile.Request {
// namespace := a.GetNamespace()
// name := a.GetName()
// cmnn := types.NamespacedName{Namespace: namespace, Name: name}
// cmnnStr := cmnn.String()
// if ddc, ok := dc.wcms[cmnnStr]; ok {
// nna := strings.Split(ddc, "/")
// // not run only for code standard
// if len(nna) != 2 {
// return nil
// }
//
// return []reconcile.Request{{NamespacedName: types.NamespacedName{
// Namespace: nna[0],
// Name: nna[1],
// }}}
// }
// return nil
// })
//
// p := predicate.Funcs{
// UpdateFunc: func(u event.UpdateEvent) bool {
// ns := u.ObjectNew.GetNamespace()
// name := u.ObjectNew.GetName()
// nsn := ns + "/" + name
// _, ok := dc.wcms[nsn]
// return ok
// },
// }
//
// return builder.Watches(&source.Kind{Type: &corev1.ConfigMap{}},
// mapFn, controller_builder.WithPredicates(p))
//}
func (dc *DisaggregatedClusterReconciler) resourceBuilder(builder *ctrl.Builder) *ctrl.Builder {
return builder.For(&dv1.DorisDisaggregatedCluster{}).
Owns(&appv1.StatefulSet{}).
Owns(&corev1.Service{})
}
// Reconcile steps:
// 1. check and register instance info. info register in memory. periodical sync.
// 2. sync resource.
// 3. clear need delete resource.
// 4. display new status(eorganize status, update cr or status)
func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var ddc dv1.DorisDisaggregatedCluster
err := dc.Get(ctx, req.NamespacedName, &ddc)
if apierrors.IsNotFound(err) {
klog.Warningf("disaggreatedClusterReconciler not find resource DorisDisaggregatedCluster namespaceName %s", req.NamespacedName)
return ctrl.Result{}, nil
}
hv := hash.HashObject(ddc.Spec)
var res ctrl.Result
var msg string
reconRes, reconErr := dc.reconcileSub(ctx, &ddc)
if reconErr != nil {
msg = msg + reconErr.Error()
}
if !reconRes.IsZero() {
res = reconRes
}
// clear unused resources.
clearRes, clearErr := dc.clearUnusedResources(ctx, &ddc)
if clearErr != nil {
msg = msg + reconErr.Error()
}
if !clearRes.IsZero() {
res = clearRes
}
//display new status.
disRes, disErr := func() (ctrl.Result, error) {
//reorganize status.
var stsRes ctrl.Result
var stsErr error
if stsRes, stsErr = dc.reorganizeStatus(&ddc); stsErr != nil {
return stsRes, stsErr
}
//update cr or status
if stsRes, stsErr = dc.updateObjectORStatus(ctx, &ddc, hv); stsErr != nil {
return stsRes, stsErr
}
return stsRes, stsErr
}()
if disErr != nil {
msg = msg + disErr.Error()
}
if !disRes.IsZero() {
res = disRes
}
if msg != "" {
return res, errors.New(msg)
}
return res, nil
}
func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
for _, subC := range dc.Scs {
subC.ClearResources(ctx, ddc)
}
return ctrl.Result{}, nil
}
func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
for _, sc := range dc.Scs {
//update component status.
if err := sc.UpdateComponentStatus(ddc); err != nil {
klog.Errorf("DorisClusterReconciler reconcile update component %s status failed.err=%s\n", sc.GetControllerName(), err.Error())
return requeueIfError(err)
}
}
ddc.Status.ClusterHealth.Health = dv1.Green
if ddc.Status.FEStatus.AvailableStatus != dv1.Available || ddc.Status.ClusterHealth.CGAvailableCount <= (ddc.Status.ClusterHealth.CGCount/2) {
ddc.Status.ClusterHealth.Health = dv1.Red
} else if ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount < ddc.Status.ClusterHealth.CGCount {
ddc.Status.ClusterHealth.Health = dv1.Yellow
}
return ctrl.Result{}, nil
}
func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
// recall all sub for check error.
errs := []error{}
for _, subC := range dc.Scs {
if err := subC.Sync(ctx, ddc); err != nil {
klog.Errorf("disaggreatedClusterReconciler sub reconciler %s sync err=%s.", subC.GetControllerName(), err.Error())
errs = append(errs, err)
}
}
if len(errs) != 0 {
msg := ""
for _, err := range errs {
msg += err.Error()
}
return ctrl.Result{}, errors.New(msg)
}
return ctrl.Result{}, nil
}
// when spec revert by operator should update cr or directly update status.
func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, preHv string) (ctrl.Result, error) {
postHv := hash.HashObject(ddc.Spec)
deepCopyDDC := ddc.DeepCopy()
if preHv != postHv {
var eddc dv1.DorisDisaggregatedCluster
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err == nil || !apierrors.IsNotFound(err) {
if eddc.ResourceVersion != "" {
ddc.ResourceVersion = eddc.ResourceVersion
}
}
if err := dc.Update(ctx, ddc); err != nil {
klog.Errorf("disaggreatedClusterReconciler update DorisDisaggregatedCluster namespace %s name %s failed, err=%s", ddc.Namespace, ddc.Name, err.Error())
//return ctrl.Result{}, err
}
}
res, err := dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC)
if err != nil {
return res, err
}
//if decommissioning, be is migrating data should wait it over, so return reconciling after 10 seconds.
for _, cgs := range ddc.Status.ComputeGroupStatuses {
if cgs.Phase == dv1.Decommissioning {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
}
// If the cluster status is abnormal(Health is not Green), reconciling is required.
if ddc.Status.ClusterHealth.Health != dv1.Green {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
return res, nil
}
func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
var eddc dv1.DorisDisaggregatedCluster
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil {
return err
}
ddc.Status.DeepCopyInto(&eddc.Status)
return dc.Status().Update(ctx, &eddc)
}); err != nil {
klog.Errorf("updateDorisDisaggregatedClusterStatus update status failed err: %s", err.Error())
}
// if the status is not equal before reconcile and now the status is not available we should requeue.
if !disAggregatedInconsistentStatus(&ddc.Status, &eddc) {
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
return ctrl.Result{}, nil
}