pkg/controller/sub_controller/disaggregated_subcontroller.go (548 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package sub_controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/metadata"
"github.com/apache/doris-operator/pkg/common/utils/resource"
"github.com/apache/doris-operator/pkg/common/utils/set"
"github.com/spf13/viper"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"os"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)
const (
//doris use LOG_DIR as the key of log path. this update from 2.1.7
//metaservice use log_dir, when used please ignore case-sensitive
oldLogPathKey = "LOG_DIR"
newLogPathKey = "sys_log_dir"
FEMetaPathKey = "meta_dir"
FELogStoreName = "fe-log"
FEMetaStoreName = "fe-meta"
BELogStoreName = "be-log"
BECacheStorePreName = "be-storage"
MSLogStoreName = "ms-log"
DefaultCacheRootPath = "/opt/apache-doris/be/file_cache"
//default cache storage size: unit=B
DefaultCacheSize int64 = 107374182400
FileCachePathKey = "file_cache_path"
FileCacheSubConfigPathKey = "path"
FileCacheSubConfigTotalSizeKey = "total_size"
)
type DisaggregatedSubController interface {
//Sync reconcile for sub controller. bool represent the component have updated.
Sync(ctx context.Context, obj client.Object) error
//clear all resource about sub-component.
ClearResources(ctx context.Context, obj client.Object) (bool, error)
//return the controller name, beController, feController,cnController for log.
GetControllerName() string
//UpdateStatus update the component status on src.
UpdateComponentStatus(obj client.Object) error
}
type DisaggregatedSubDefaultController struct {
K8sclient client.Client
K8srecorder record.EventRecorder
ControllerName string
}
func (d *DisaggregatedSubDefaultController) GetConfigValuesFromConfigMaps(namespace string, resolveKey string, cms []v1.ConfigMap) map[string]interface{} {
if len(cms) == 0 {
return nil
}
for _, cm := range cms {
kcm, err := k8s.GetConfigMap(context.Background(), d.K8sclient, namespace, cm.Name)
if err != nil {
klog.Errorf("disaggregatedFEController getConfigValuesFromConfigMaps namespace=%s, name=%s, failed, err=%s", namespace, cm.Name, err.Error())
continue
}
if _, ok := kcm.Data[resolveKey]; !ok {
continue
}
v := kcm.Data[resolveKey]
return d.resolveStartConfig([]byte(v), resolveKey)
}
return nil
}
func (d *DisaggregatedSubDefaultController) resolveStartConfig(vb []byte, resolveKey string) map[string]interface{} {
switch resolveKey {
case resource.MS_RESOLVEKEY:
os.Setenv("DORIS_HOME", resource.DEFAULT_ROOT_PATH+"/ms")
case resource.FE_RESOLVEKEY:
os.Setenv("DORIS_HOME", resource.DEFAULT_ROOT_PATH+"/fe")
case resource.BE_RESOLVEKEY:
os.Setenv("DORIS_HOME", resource.DEFAULT_ROOT_PATH+"/be")
default:
}
viper.SetConfigType("properties")
viper.ReadConfig(bytes.NewBuffer(vb))
return viper.AllSettings()
}
// for config default values.
func (d *DisaggregatedSubDefaultController) NewDefaultService(ddc *v1.DorisDisaggregatedCluster) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: ddc.Namespace,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
APIVersion: ddc.APIVersion,
Kind: ddc.Kind,
Name: ddc.Name,
UID: ddc.UID,
},
},
},
Spec: corev1.ServiceSpec{
SessionAffinity: corev1.ServiceAffinityClientIP,
},
}
}
func (d *DisaggregatedSubDefaultController) NewDefaultStatefulset(ddc *v1.DorisDisaggregatedCluster) *appv1.StatefulSet {
return &appv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: ddc.Namespace,
OwnerReferences: []metav1.OwnerReference{resource.GetOwnerReference(ddc)},
},
Spec: appv1.StatefulSetSpec{
PodManagementPolicy: appv1.ParallelPodManagement,
RevisionHistoryLimit: metadata.GetInt32Pointer(5),
UpdateStrategy: appv1.StatefulSetUpdateStrategy{
Type: appv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appv1.RollingUpdateStatefulSetStrategy{
Partition: metadata.GetInt32Pointer(0),
},
},
},
}
}
func (d *DisaggregatedSubDefaultController) BuildDefaultConfigMapVolumesVolumeMounts(cms []v1.ConfigMap) ([]corev1.Volume, []corev1.VolumeMount) {
var vs []corev1.Volume
var vms []corev1.VolumeMount
for _, cm := range cms {
v := corev1.Volume{
Name: cm.Name,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cm.Name,
},
},
},
}
vs = append(vs, v)
vm := corev1.VolumeMount{
Name: cm.Name,
MountPath: cm.MountPath,
}
if vm.MountPath == "" {
vm.MountPath = resource.ConfigEnvPath
}
vms = append(vms, vm)
}
return vs, vms
}
func (d *DisaggregatedSubDefaultController) ConstructDefaultAffinity(matchKey, value string, ddcAffinity *corev1.Affinity) *corev1.Affinity {
affinity := d.newDefaultAffinity(matchKey, value)
if ddcAffinity == nil {
return affinity
}
ddcPodAntiAffinity := ddcAffinity.PodAntiAffinity
if ddcPodAntiAffinity != nil {
affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = ddcPodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, ddcPodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
}
affinity.NodeAffinity = ddcAffinity.NodeAffinity
affinity.PodAffinity = ddcAffinity.PodAffinity
return affinity
}
func (d *DisaggregatedSubDefaultController) newDefaultAffinity(matchKey, value string) *corev1.Affinity {
if matchKey == "" || value == "" {
return nil
}
podAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 20,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{Key: matchKey, Operator: metav1.LabelSelectorOpIn, Values: []string{value}},
},
},
TopologyKey: resource.NODE_TOPOLOGYKEY,
},
}
return &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{podAffinityTerm},
},
}
}
// the common logic to apply service, will used by fe,be,ms.
func (d *DisaggregatedSubDefaultController) DefaultReconcileService(ctx context.Context, svc *corev1.Service) (*Event, error) {
if err := k8s.ApplyService(ctx, d.K8sclient, svc, func(nsvc, osvc *corev1.Service) bool {
return resource.ServiceDeepEqualWithAnnoKey(nsvc, osvc, v1.DisaggregatedSpecHashValueAnnotation)
}); err != nil {
klog.Errorf("disaggregatedSubDefaultController reconcileService apply service namespace=%s name=%s failed, err=%s", svc.Namespace, svc.Name, err.Error())
return &Event{Type: EventWarning, Reason: ServiceApplyedFailed, Message: err.Error()}, err
}
return nil, nil
}
// generate map for mountpath:secret
func (d *DisaggregatedSubDefaultController) CheckSecretMountPath(ddc *v1.DorisDisaggregatedCluster, secrets []v1.Secret) {
var mountsMap = make(map[string]v1.Secret)
for _, secret := range secrets {
path := secret.MountPath
if s, exist := mountsMap[path]; exist {
klog.Errorf("CheckSecretMountPath error: the mountPath %s is repeated between secret: %s and secret: %s.", path, secret.SecretName, s.SecretName)
d.K8srecorder.Event(ddc, string(EventWarning), string(SecretPathRepeated), fmt.Sprintf("the mountPath %s is repeated between secret: %s and secret: %s.", path, secret.SecretName, s.SecretName))
}
mountsMap[path] = secret
}
}
// CheckSecretExist, check the secret exist or not in specify namespace.
func (d *DisaggregatedSubDefaultController) CheckSecretExist(ctx context.Context, ddc *v1.DorisDisaggregatedCluster, secrets []v1.Secret) {
errMessage := ""
for _, secret := range secrets {
var s corev1.Secret
if getErr := d.K8sclient.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: secret.SecretName}, &s); getErr != nil {
errMessage = errMessage + fmt.Sprintf("(name: %s, namespace: %s, err: %s), ", secret.SecretName, ddc.Namespace, getErr.Error())
}
}
if errMessage != "" {
klog.Errorf("CheckSecretExist error: %s.", errMessage)
d.K8srecorder.Event(ddc, string(EventWarning), string(SecretNotExist), fmt.Sprintf("CheckSecretExist error: %s.", errMessage))
}
}
// RestrictConditionsEqual adds two StatefulSet,
// It is used to control the conditions for comparing.
// nst StatefulSet - a new StatefulSet
// est StatefulSet - an old StatefulSet
func (d *DisaggregatedSubDefaultController) RestrictConditionsEqual(nst *appv1.StatefulSet, est *appv1.StatefulSet) {
//shield persistent volume update when the pvcProvider=Operator
//in webhook should intercept the volume spec updated when use statefulset pvc.
// TODO: updates to statefulset spec for fields other than 'replicas', 'template', 'updateStrategy', 'persistentVolumeClaimRetentionPolicy' and 'minReadySeconds' are forbidden
nst.Spec.VolumeClaimTemplates = est.Spec.VolumeClaimTemplates
}
func (d *DisaggregatedSubDefaultController) GetManagementAdminUserAndPWD(ctx context.Context, ddc *v1.DorisDisaggregatedCluster) (string, string) {
adminUserName := "root"
password := ""
if ddc.Spec.AuthSecret != "" {
secret, _ := k8s.GetSecret(ctx, d.K8sclient, ddc.Namespace, ddc.Spec.AuthSecret)
adminUserName, password = resource.GetDorisLoginInformation(secret)
} else if ddc.Spec.AdminUser != nil {
adminUserName = ddc.Spec.AdminUser.Name
password = ddc.Spec.AdminUser.Password
}
return adminUserName, password
}
func (d *DisaggregatedSubDefaultController) BuildVolumesVolumeMountsAndPVCs(confMap map[string]interface{}, componentType v1.DisaggregatedComponentType, commonSpec *v1.CommonSpec) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) {
if commonSpec.PersistentVolume == nil && len(commonSpec.PersistentVolumes) == 0 {
vs, vms := d.getEmptyDirVolumesVolumeMounts(confMap, componentType)
return vs, vms, nil
}
if commonSpec.PersistentVolume != nil {
return d.PersistentVolumeBuildVolumesVolumeMountsAndPVCs(commonSpec, confMap, componentType)
}
return d.PersistentVolumeArrayBuildVolumesVolumeMountsAndPVCs(commonSpec, confMap, componentType)
}
// the old config before 25.2.1, the requiredPaths should filter log path before call this function.
func (d *DisaggregatedSubDefaultController) PersistentVolumeBuildVolumesVolumeMountsAndPVCs(commonSpec *v1.CommonSpec, confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) {
v1pv := commonSpec.PersistentVolume
if v1pv == nil {
return nil, nil, nil
}
pathName := map[string]string{} /*key=path, value=name*/
var requiredPaths []string
switch componentType {
case v1.DisaggregatedMS:
//if logNotStore anywhere is true, not build pvc.
if !commonSpec.PersistentVolume.LogNotStore && !commonSpec.LogNotStore {
logPath := d.getLogPath(confMap, componentType)
pathName[logPath] = MSLogStoreName
requiredPaths = append(requiredPaths, logPath)
}
case v1.DisaggregatedFE:
if !commonSpec.PersistentVolume.LogNotStore && !commonSpec.LogNotStore {
logPath := d.getLogPath(confMap, componentType)
pathName[logPath] = FELogStoreName
requiredPaths = append(requiredPaths, logPath)
}
metaPath := d.getFEMetaPath(confMap)
pathName[metaPath] = FEMetaStoreName
requiredPaths = append(requiredPaths, metaPath)
case v1.DisaggregatedBE:
if !commonSpec.PersistentVolume.LogNotStore && !commonSpec.LogNotStore {
logPath := d.getLogPath(confMap, componentType)
pathName[logPath] = BELogStoreName
requiredPaths = append(requiredPaths, logPath)
}
cachePaths, _ := d.getCacheMaxSizeAndPaths(confMap)
for i, _ := range cachePaths {
path_i := BECacheStorePreName + strconv.Itoa(i)
pathName[cachePaths[i]] = path_i
requiredPaths = append(requiredPaths, cachePaths[i])
}
//generate the last path's name, the ordinal is length of cache paths.
baseIndex := len(cachePaths)
for _, path := range v1pv.MountPaths {
if _, ok := pathName[path]; ok {
//compatible before name= storage+i
continue
}
requiredPaths = append(requiredPaths, path)
pathName[path] = BECacheStorePreName + strconv.Itoa(baseIndex)
baseIndex = baseIndex + 1
}
default:
}
var vs []corev1.Volume
var vms []corev1.VolumeMount
var pvcs []corev1.PersistentVolumeClaim
for _, path := range requiredPaths {
name := pathName[path]
vs = append(vs, corev1.Volume{Name: name, VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: name,
}}})
vms = append(vms, corev1.VolumeMount{Name: name, MountPath: path})
pvcs = append(pvcs, corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: v1pv.Annotations,
},
Spec: *v1pv.PersistentVolumeClaimSpec.DeepCopy(),
})
}
return vs, vms, pvcs
}
// use array of PersistentVolume, the new config from 25.2.x
func (d *DisaggregatedSubDefaultController) PersistentVolumeArrayBuildVolumesVolumeMountsAndPVCs(commonSpec *v1.CommonSpec, confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) {
var requiredPaths []string
//find storage mountPaths.
switch componentType {
case v1.DisaggregatedFE:
metaPath := d.getFEMetaPath(confMap)
requiredPaths = append(requiredPaths, metaPath)
case v1.DisaggregatedBE:
cachePaths, _ := d.getCacheMaxSizeAndPaths(confMap)
requiredPaths = append(requiredPaths, cachePaths...)
default:
}
//check logNotStore, if true should not generate log pvc.
logNotStore := false
for _, v1pv := range commonSpec.PersistentVolumes {
if len(v1pv.MountPaths) != 0 {
requiredPaths = append(requiredPaths, v1pv.MountPaths...)
}
logNotStore = logNotStore || v1pv.LogNotStore
}
//the last check logNotStore, fist check config in any one of persistentVolumes.
if !logNotStore && !commonSpec.LogNotStore {
logPath := d.getLogPath(confMap, componentType)
requiredPaths = append(requiredPaths, logPath)
}
//generate name of persistentVolumeClaim use the mountPath
namePath := map[string]string{}
pathName := map[string]string{}
for _, path := range requiredPaths {
//use unix path separator.
sp := strings.Split(path, "/")
name := ""
for i := 1; i <= len(sp); i++ {
if sp[len(sp)-i] == "" {
continue
}
if name == "" {
name = sp[len(sp)-i]
} else {
name = sp[len(sp)-i] + "-" + name
}
if _, ok := namePath[name]; !ok {
break
}
}
namePath[name] = path
pathName[path] = name
}
pathPV := map[string]*v1.PersistentVolume{}
//the template index.
ti := -1
for i, v1pv := range commonSpec.PersistentVolumes {
if len(v1pv.MountPaths) == 0 {
ti = i
continue
}
for _, mp := range v1pv.MountPaths {
pathPV[mp] = &commonSpec.PersistentVolumes[i]
}
}
var vs []corev1.Volume
var vms []corev1.VolumeMount
var pvcs []corev1.PersistentVolumeClaim
//generate pvc from the last path in requiredPaths, the mountPath that configured by user is the highest wight, so first use the v1pv to generate pvc not template v1pv.
ss := set.NewSetString()
for i:= len(requiredPaths); i > 0; i-- {
path := requiredPaths[i -1]
//if the path have build volume, vm, pvc, skip it.
if ss.Find(path) {
continue
}
ss.Add(path)
pv, ok := pathPV[path]
name := pathName[path]
metadataName := strings.ReplaceAll(name, "_", "-")
//use specific PersistentVolume generate volume, vm, pvc
if ok {
vs = append(vs, corev1.Volume{Name: metadataName, VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: metadataName,
}}})
vms = append(vms, corev1.VolumeMount{Name: metadataName, MountPath: path})
pvcs = append(pvcs, corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: metadataName,
Annotations: pv.Annotations,
},
Spec: *pv.PersistentVolumeClaimSpec.DeepCopy(),
})
}
//use template PersistentVolume generate volume, vm, pvc
if !ok && ti != -1 {
vs = append(vs, corev1.Volume{Name: metadataName, VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: metadataName,
}}})
vms = append(vms, corev1.VolumeMount{Name: metadataName, MountPath: path})
pvcs = append(pvcs, corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: metadataName,
Annotations: commonSpec.PersistentVolumes[ti].Annotations,
},
Spec: *commonSpec.PersistentVolumes[ti].PersistentVolumeClaimSpec.DeepCopy(),
})
}
}
return vs, vms, pvcs
}
func (d *DisaggregatedSubDefaultController) getEmptyDirVolumesVolumeMounts(confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) ([]corev1.Volume, []corev1.VolumeMount) {
switch componentType {
case v1.DisaggregatedMS:
return d.getMSEmptyDirVolumesVolumeMounts(confMap)
case v1.DisaggregatedFE:
return d.getFEEmptyDirVolumesVolumeMounts(confMap)
case v1.DisaggregatedBE:
return d.getBEEmptyDirVolumesVolumeMounts(confMap)
default:
return nil, nil
}
}
func (d *DisaggregatedSubDefaultController) getBEEmptyDirVolumesVolumeMounts(confMap map[string]interface{}) ([]corev1.Volume, []corev1.VolumeMount) {
vs := []corev1.Volume{
{
Name: BELogStoreName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
}
vms := []corev1.VolumeMount{
{
Name: BELogStoreName,
MountPath: d.getLogPath(confMap, v1.DisaggregatedBE),
},
}
cachePaths, _ := d.getCacheMaxSizeAndPaths(confMap)
for i, path := range cachePaths {
vs = append(vs, corev1.Volume{
Name: BECacheStorePreName + strconv.Itoa(i),
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
vms = append(vms, corev1.VolumeMount{
Name: BECacheStorePreName + strconv.Itoa(i),
MountPath: path,
})
}
return vs, vms
}
func (d *DisaggregatedSubDefaultController) getCacheMaxSizeAndPaths(cvs map[string]interface{}) ([]string, int64) {
v := cvs[FileCachePathKey]
if v == nil {
return []string{DefaultCacheRootPath}, DefaultCacheSize
}
var paths []string
var maxCacheSize int64
vbys := v.(string)
var pa []map[string]interface{}
err := json.Unmarshal([]byte(vbys), &pa)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController getStorageMaxSizeAndPaths json unmarshal file_cache_path failed, err=%s", err.Error())
return []string{}, 0
}
for i, mp := range pa {
pv := mp[FileCacheSubConfigPathKey]
pv_str, ok := pv.(string)
if !ok {
klog.Errorf("disaggregatedComputeGroupsController getStorageMaxSizeAndPaths index %d have not path config.", i)
continue
}
paths = append(paths, pv_str)
cache_v := mp[FileCacheSubConfigTotalSizeKey]
fc_size, ok := cache_v.(float64)
cache_size := int64(fc_size)
if !ok {
klog.Errorf("disaggregatedComputeGroupsController getStorageMaxSizeAndPaths index %d total_size is not number.", i)
continue
}
if maxCacheSize < cache_size {
maxCacheSize = cache_size
}
}
return paths, maxCacheSize
}
// use emptyDir mode generate metaservice use volume and volumeMount.
func (d *DisaggregatedSubDefaultController) getMSEmptyDirVolumesVolumeMounts(confMap map[string]interface{}) ([]corev1.Volume, []corev1.VolumeMount) {
vs := []corev1.Volume{
{
Name: MSLogStoreName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
}
vms := []corev1.VolumeMount{
{
Name: MSLogStoreName,
MountPath: d.getLogPath(confMap, v1.DisaggregatedMS),
},
}
return vs, vms
}
// use emptyDir mode generate fe use volume and volumeMount.
func (d *DisaggregatedSubDefaultController) getFEEmptyDirVolumesVolumeMounts(confMap map[string]interface{}) ([]corev1.Volume, []corev1.VolumeMount) {
vs := []corev1.Volume{
{
Name: FELogStoreName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: FEMetaStoreName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
}
vms := []corev1.VolumeMount{
{
Name: FELogStoreName,
MountPath: d.getLogPath(confMap, v1.DisaggregatedFE),
},
{
Name: FEMetaStoreName,
MountPath: d.getFEMetaPath(confMap),
},
}
return vs, vms
}
// confMap, convert use viper, the viper will convert key to lowercase.
func (d *DisaggregatedSubDefaultController) getLogPath(confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) string {
v := confMap[oldLogPathKey]
if v != nil {
return v.(string)
}
v = confMap[newLogPathKey]
if v != nil {
return v.(string)
}
//return default log path.
switch componentType {
case v1.DisaggregatedMS:
return resource.DEFAULT_ROOT_PATH + "/ms/log"
case v1.DisaggregatedFE:
return resource.DEFAULT_ROOT_PATH + "/fe/log"
case v1.DisaggregatedBE:
return resource.DEFAULT_ROOT_PATH + "/be/log"
default:
return ""
}
}
func (d *DisaggregatedSubDefaultController) getFEMetaPath(confMap map[string]interface{}) string {
v := confMap[FEMetaPathKey]
if v == nil {
return resource.DEFAULT_ROOT_PATH + "/fe/doris-meta"
}
return v.(string)
}