pkg/controller/sub_controller/be/pod.go (121 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package be import ( "context" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" "strconv" v1 "github.com/apache/doris-operator/api/doris/v1" "github.com/apache/doris-operator/pkg/common/utils/resource" corev1 "k8s.io/api/core/v1" ) func (be *Controller) buildBEPodTemplateSpec(dcr *v1.DorisCluster, config map[string]interface{}) corev1.PodTemplateSpec { podTemplateSpec := resource.NewPodTemplateSpec(dcr, config, v1.Component_BE) //if enable fe affinity, should not add fe antiAffinity and set the weight of affinity less than be antiAffinity. if dcr.Spec.BeSpec.EnableFeAffinity == true { be.addFeAffinity(&podTemplateSpec) } else { be.addFeAntiAffinity(&podTemplateSpec) } be.addTerminationGracePeriodSeconds(dcr, &podTemplateSpec) var containers []corev1.Container containers = append(containers, podTemplateSpec.Spec.Containers...) beContainer := be.beContainer(dcr) containers = append(containers, beContainer) if dcr.Spec.BeSpec.EnableWorkloadGroup { if dcr.Spec.BeSpec.ContainerSecurityContext == nil { dcr.Spec.BeSpec.ContainerSecurityContext = &corev1.SecurityContext{} } dcr.Spec.BeSpec.ContainerSecurityContext.Privileged = pointer.Bool(true) } containers = resource.ApplySecurityContext(containers, dcr.Spec.BeSpec.ContainerSecurityContext) podTemplateSpec.Spec.Containers = containers return podTemplateSpec } // @Notice, the logic is error, should use MatchExpressions not matchLabels, the label used for select nodes, and the key:value "kubernetes.io/hostname=fe" is not exist in default k8s without assign to node by manual. // although, the code is not harmless, so for stable the codes not need deleted. // be pods add fe anti affinity for prefer deploy fe and be on different nodes. func (be *Controller) addFeAntiAffinity(tplSpec *corev1.PodTemplateSpec) { preferedScheduleTerm := corev1.WeightedPodAffinityTerm{ Weight: 80, PodAffinityTerm: corev1.PodAffinityTerm{ TopologyKey: resource.NODE_TOPOLOGYKEY, LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ resource.NODE_TOPOLOGYKEY: string(v1.Component_FE), }, }, }, } if tplSpec.Spec.Affinity == nil { tplSpec.Spec.Affinity = &corev1.Affinity{} } if tplSpec.Spec.Affinity.PodAntiAffinity == nil { tplSpec.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} } tplSpec.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(tplSpec.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, preferedScheduleTerm) } // add fe affinity for be, wish the fe and be will 1:1 deployed in same node. func (be *Controller) addFeAffinity(tplSpec *corev1.PodTemplateSpec) { pst := corev1.WeightedPodAffinityTerm{ // the weight of be antiAffinity with be is 20. Weight: 15, PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { Key: v1.ComponentLabelKey, Operator: metav1.LabelSelectorOpIn, Values: []string{string(v1.Component_FE)}, }, }, }, TopologyKey: resource.NODE_TOPOLOGYKEY, }, } if tplSpec.Spec.Affinity == nil { tplSpec.Spec.Affinity = &corev1.Affinity{} } if tplSpec.Spec.Affinity.PodAffinity == nil { tplSpec.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} } tplSpec.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(tplSpec.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution, pst) } func (be *Controller) beContainer(dcr *v1.DorisCluster) corev1.Container { config, _ := be.GetConfig(context.Background(), &dcr.Spec.BeSpec.ConfigMapInfo, dcr.Namespace, v1.Component_BE) c := resource.NewBaseMainContainer(dcr, config, v1.Component_BE) addr, port := v1.GetConfigFEAddrForAccess(dcr, v1.Component_BE) var feConfig map[string]interface{} //if fe addr not config, we should use external service as addr and port get from fe config. if addr == "" { if dcr.Spec.FeSpec != nil { feConfig, _ = be.GetConfig(context.Background(), &dcr.Spec.FeSpec.ConfigMapInfo, dcr.Namespace, v1.Component_FE) } addr = v1.GenerateExternalServiceName(dcr, v1.Component_FE) } feQueryPort := strconv.FormatInt(int64(resource.GetPort(feConfig, resource.QUERY_PORT)), 10) if port != -1 { feQueryPort = strconv.FormatInt(int64(port), 10) } ports := resource.GetContainerPorts(config, v1.Component_BE) c.Name = "be" c.Ports = append(c.Ports, ports...) c.Env = append(c.Env, corev1.EnvVar{ Name: resource.ENV_FE_ADDR, Value: addr, }, corev1.EnvVar{ Name: resource.ENV_FE_PORT, Value: feQueryPort, }) if dcr.Spec.BeSpec.EnableWorkloadGroup { c.Env = append(c.Env, corev1.EnvVar{ Name: resource.ENABLE_WORKLOAD_GROUP, Value: fmt.Sprintf("%t", dcr.Spec.BeSpec.EnableWorkloadGroup), }) } return c } // Only configure the TerminationGracePeriodSeconds when grace_shutdown_wait_seconds configured in be.conf func (be *Controller) addTerminationGracePeriodSeconds(dcr *v1.DorisCluster, tplSpec *corev1.PodTemplateSpec) { config, _ := be.GetConfig(context.Background(), &dcr.Spec.BeSpec.ConfigMapInfo, dcr.Namespace, v1.Component_BE) seconds := resource.GetTerminationGracePeriodSeconds(config) if seconds > 0 { tplSpec.Spec.TerminationGracePeriodSeconds = &seconds return } return }