pkg/common/utils/resource/pod.go (987 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package resource
import (
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
v1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils/kerberos"
"github.com/apache/doris-operator/pkg/common/utils/set"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"strconv"
"strings"
)
const (
config_env_path = "/etc/doris"
ConfigEnvPath = config_env_path
secret_config_path = config_env_path
config_env_name = "CONFIGMAP_MOUNT_PATH"
basic_auth_path = "/etc/basic_auth"
auth_volume_name = "basic-auth"
be_storage_name = "be-storage"
be_storage_path = "/opt/apache-doris/be/storage"
fe_meta_path = "/opt/apache-doris/fe/doris-meta"
fe_meta_name = "fe-meta"
keytab_volume_name = "keytab"
keytab_default_mount_path = "/etc/keytab"
krb5_volume_name = "krb5"
krb5_default_mount_path = "/etc/krb5"
HEALTH_API_PATH = "/api/health"
HEALTH_BROKER_LIVE_COMMAND = "/opt/apache-doris/broker_is_alive.sh"
FE_PRESTOP = "/opt/apache-doris/fe_prestop.sh"
BE_PRESTOP = "/opt/apache-doris/be_prestop.sh"
BROKER_PRESTOP = "/opt/apache-doris/broker_prestop.sh"
//keys for pod env variables
POD_NAME = "POD_NAME"
POD_IP = "POD_IP"
HOST_IP = "HOST_IP"
POD_NAMESPACE = "POD_NAMESPACE"
ADMIN_USER = "USER"
ADMIN_PASSWD = "PASSWD"
DORIS_ROOT_KEY = "DORIS_ROOT"
KRB5_MOUNT_PATH = "KRB5_MOUNT_PATH"
KRB5_CONFIG = "KRB5_CONFIG"
KEYTAB_MOUNT_PATH = "KEYTAB_MOUNT_PATH"
KEYTAB_FINAL_USED_PATH = "KEYTAB_FINAL_USED_PATH"
DEFAULT_ADMIN_USER = "root"
DEFAULT_ROOT_PATH = "/opt/apache-doris"
POD_INFO_PATH = "/etc/podinfo"
POD_INFO_VOLUME_NAME = "podinfo"
NODE_TOPOLOGYKEY = "kubernetes.io/hostname"
DEFAULT_INIT_IMAGE = "selectdb/alpine:latest"
HEALTH_DISAGGREGATED_FE_PROBE_COMMAND = "/opt/apache-doris/fe_disaggregated_probe.sh"
HEALTH_DISAGGREGATED_BE_PROBE_COMMAND = "/opt/apache-doris/be_disaggregated_probe.sh"
HEALTH_DISAGGREGATED_MS_PROBE_COMMAND = "/opt/apache-doris/ms_disaggregated_probe.sh"
DISAGGREGATED_LIVE_PARAM_ALIVE = "alive"
DISAGGREGATED_LIVE_PARAM_READY = "ready"
)
type ProbeType string
var (
HttpGet ProbeType = "httpGet"
TcpSocket ProbeType = "tcpSocket"
Exec ProbeType = "exec"
)
func NewPodTemplateSpec(dcr *v1.DorisCluster, config map[string]interface{}, componentType v1.ComponentType) corev1.PodTemplateSpec {
spec := getBaseSpecFromCluster(dcr, componentType)
var volumes []corev1.Volume
var si *v1.SystemInitialization
var dcrAffinity *corev1.Affinity
var defaultInitContainers []corev1.Container
var SecurityContext *corev1.PodSecurityContext
var skipInit bool
sharedVolumes, _, sharedPaths := BuildSharedVolumesAndVolumeMounts(dcr.Spec.SharedPersistentVolumeClaims, componentType)
switch componentType {
case v1.Component_FE:
volumes = newVolumesFromBaseSpec(dcr.Spec.FeSpec.BaseSpec, sharedPaths, config, componentType)
si = dcr.Spec.FeSpec.BaseSpec.SystemInitialization
dcrAffinity = dcr.Spec.FeSpec.BaseSpec.Affinity
SecurityContext = dcr.Spec.FeSpec.BaseSpec.SecurityContext
case v1.Component_BE:
volumes = newVolumesFromBaseSpec(dcr.Spec.BeSpec.BaseSpec, sharedPaths, config, componentType)
si = dcr.Spec.BeSpec.BaseSpec.SystemInitialization
dcrAffinity = dcr.Spec.BeSpec.BaseSpec.Affinity
SecurityContext = dcr.Spec.BeSpec.BaseSpec.SecurityContext
skipInit = dcr.Spec.BeSpec.SkipDefaultSystemInit
case v1.Component_CN:
si = dcr.Spec.CnSpec.BaseSpec.SystemInitialization
dcrAffinity = dcr.Spec.CnSpec.BaseSpec.Affinity
SecurityContext = dcr.Spec.CnSpec.BaseSpec.SecurityContext
skipInit = dcr.Spec.CnSpec.SkipDefaultSystemInit
case v1.Component_Broker:
si = dcr.Spec.BrokerSpec.BaseSpec.SystemInitialization
dcrAffinity = dcr.Spec.BrokerSpec.BaseSpec.Affinity
SecurityContext = dcr.Spec.BrokerSpec.BaseSpec.SecurityContext
default:
klog.Errorf("NewPodTemplateSpec dorisClusterName %s, namespace %s componentType %s not supported.", dcr.Name, dcr.Namespace, componentType)
}
if len(volumes) == 0 {
volumes, _ = getDefaultVolumesVolumeMounts(componentType)
}
//map pod labels and annotations into pod
volumes, _ = appendPodInfoVolumesVolumeMounts(volumes, nil)
if dcr.Spec.AuthSecret != "" {
volumes = append(volumes, corev1.Volume{
Name: auth_volume_name,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: dcr.Spec.AuthSecret,
},
},
})
}
if len(GetMountConfigMapInfo(spec.ConfigMapInfo)) != 0 {
configVolumes, _ := getMultiConfigVolumeAndVolumeMount(&spec.ConfigMapInfo, componentType)
volumes = append(volumes, configVolumes...)
}
if len(spec.Secrets) != 0 {
secretVolumes, _ := getMultiSecretVolumeAndVolumeMount(spec, componentType)
volumes = append(volumes, secretVolumes...)
}
if dcr.Spec.KerberosInfo != nil {
kerberosVolumes, _ := getKerberosVolumeAndVolumeMount(dcr.Spec.KerberosInfo)
volumes = append(volumes, kerberosVolumes...)
}
if len(sharedVolumes) != 0 {
volumes = append(volumes, sharedVolumes...)
}
pts := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: GeneratePodTemplateName(dcr, componentType),
Annotations: spec.Annotations,
Labels: v1.GetPodLabels(dcr, componentType),
},
Spec: corev1.PodSpec{
ImagePullSecrets: spec.ImagePullSecrets,
NodeSelector: spec.NodeSelector,
Volumes: volumes,
ServiceAccountName: spec.ServiceAccount,
Affinity: spec.Affinity.DeepCopy(),
Tolerations: spec.Tolerations,
HostAliases: spec.HostAliases,
InitContainers: defaultInitContainers,
SecurityContext: SecurityContext,
},
}
constructInitContainers(skipInit, componentType, &pts.Spec, si)
pts.Spec.Affinity = constructAffinity(dcrAffinity, componentType)
return pts
}
// for disaggregated cluster.
func NewPodTemplateSpecWithCommonSpec(cs *dv1.CommonSpec, componentType dv1.DisaggregatedComponentType) corev1.PodTemplateSpec {
var vs []corev1.Volume
si := cs.SystemInitialization
var defaultInitContainers []corev1.Container
vs, _ = appendPodInfoVolumesVolumeMounts(vs, nil)
pts := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: strings.ToLower(string(componentType)),
Annotations: cs.Annotations,
Labels: cs.Labels,
},
Spec: corev1.PodSpec{
ImagePullSecrets: cs.ImagePullSecrets,
NodeSelector: cs.NodeSelector,
ServiceAccountName: cs.ServiceAccount,
Affinity: cs.Affinity.DeepCopy(),
Tolerations: cs.Tolerations,
HostAliases: cs.HostAliases,
InitContainers: defaultInitContainers,
SecurityContext: cs.SecurityContext,
Volumes: vs,
},
}
constructDisaggregatedInitContainers(componentType, &pts.Spec, si)
return pts
}
// build disaggregated node(fe,be) container.
func NewContainerWithCommonSpec(cs *dv1.CommonSpec) corev1.Container {
var vms []corev1.VolumeMount
_, vms = appendPodInfoVolumesVolumeMounts(nil, vms)
c := corev1.Container{
Image: cs.Image,
SecurityContext: cs.ContainerSecurityContext,
Resources: cs.ResourceRequirements,
VolumeMounts: vms,
}
return c
}
// ApplySecurityContext applies the container security context to all containers in the pod (if not already set).
func ApplySecurityContext(containers []corev1.Container, securityContext *corev1.SecurityContext) []corev1.Container {
if securityContext == nil {
return containers
}
for i := range containers {
if containers[i].SecurityContext == nil {
containers[i].SecurityContext = securityContext
} else {
klog.Info("SecurityContext already exists in container" + containers[i].Name + "! Not overwriting it.")
}
}
return containers
}
func constructInitContainers(skipInit bool, componentType v1.ComponentType, podSpec *corev1.PodSpec, si *v1.SystemInitialization) {
defaultImage := ""
var defaultInitContains []corev1.Container
if si != nil {
initContainer := newBaseInitContainer("init", si)
defaultImage = si.InitImage
defaultInitContains = append(defaultInitContains, initContainer)
}
// the init containers have sequence,should confirm use initial is always in the first priority.
if !skipInit && (componentType == v1.Component_BE || componentType == v1.Component_CN) {
podSpec.InitContainers = append(podSpec.InitContainers, constructBeDefaultInitContainer(defaultImage))
}
podSpec.InitContainers = append(podSpec.InitContainers, defaultInitContains...)
}
func constructDisaggregatedInitContainers(componentType dv1.DisaggregatedComponentType, podSpec *corev1.PodSpec, si *dv1.SystemInitialization) {
initImage := DEFAULT_INIT_IMAGE
var defaultInitContains []corev1.Container
if si != nil {
enablePrivileged := true
if si.InitImage != "" {
initImage = si.InitImage
}
initContainer := corev1.Container{
Image: initImage,
Name: "init",
Command: si.Command,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: si.Args,
SecurityContext: &corev1.SecurityContext{
Privileged: &enablePrivileged,
},
}
si.InitImage = initImage
defaultInitContains = append(defaultInitContains, initContainer)
}
// the init containers have sequence,should confirm use initial is always in the first priority.
if componentType == dv1.DisaggregatedBE {
podSpec.InitContainers = append(podSpec.InitContainers, constructBeDefaultInitContainer(initImage))
}
podSpec.InitContainers = append(podSpec.InitContainers, defaultInitContains...)
}
// newVolumesFromBaseSpec return corev1.Volume build from baseSpec.
func newVolumesFromBaseSpec(spec v1.BaseSpec, sharedPaths []string, config map[string]interface{}, componentType v1.ComponentType) []corev1.Volume {
var volumes []corev1.Volume
dorisPersistentVolumes, _ := GenerateEveryoneMountPathDorisPersistentVolume(&spec, sharedPaths, config, componentType)
for _, dorisPersistentVolume := range dorisPersistentVolumes {
var volume corev1.Volume
volume.Name = dorisPersistentVolume.Name
volume.VolumeSource = corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: dorisPersistentVolume.Name,
},
}
volumes = append(volumes, volume)
}
return volumes
}
func BuildSharedVolumesAndVolumeMounts(spvcs []v1.SharedPersistentVolumeClaim, componentType v1.ComponentType) ([]corev1.Volume, []corev1.VolumeMount, []string) {
if len(spvcs) == 0 {
return nil, nil, nil
}
var volumes []corev1.Volume
var mounts []corev1.VolumeMount
var paths []string
home := getDefaultDorisHome(componentType)
for _, claim := range spvcs {
path := strings.ReplaceAll(claim.MountPath, "$DORIS_HOME", home)
if strings.HasSuffix(path, "/") {
path = path[:len(path)-1]
}
if len(claim.SupportComponents) == 0 || set.ArrayContains(claim.SupportComponents, componentType) {
volumes = append(volumes, corev1.Volume{
Name: claim.PersistentVolumeClaimName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claim.PersistentVolumeClaimName,
},
},
})
mounts = append(mounts, corev1.VolumeMount{
Name: claim.PersistentVolumeClaimName,
MountPath: path,
})
paths = append(paths, path)
}
}
return volumes, mounts, paths
}
// buildVolumeMounts construct all volumeMounts contains default volumeMounts if persistentVolumes not definition.
func buildVolumeMounts(spec v1.BaseSpec, sharedPaths []string, config map[string]interface{}, componentType v1.ComponentType) []corev1.VolumeMount {
var volumeMounts []corev1.VolumeMount
_, volumeMounts = appendPodInfoVolumesVolumeMounts(nil, volumeMounts)
if len(spec.PersistentVolumes) == 0 {
_, volumeMount := getDefaultVolumesVolumeMounts(componentType)
volumeMounts = append(volumeMounts, volumeMount...)
return volumeMounts
}
dorisPersistentVolumes, _ := GenerateEveryoneMountPathDorisPersistentVolume(&spec, sharedPaths, config, componentType)
for _, dorisPersistentVolume := range dorisPersistentVolumes {
var volumeMount corev1.VolumeMount
volumeMount.MountPath = dorisPersistentVolume.MountPath
volumeMount.Name = dorisPersistentVolume.Name
volumeMounts = append(volumeMounts, volumeMount)
}
return volumeMounts
}
// dst array have high priority will cover the src env when the env's name is right.
func mergeEnvs(src []corev1.EnvVar, dst []corev1.EnvVar) []corev1.EnvVar {
if len(dst) == 0 {
return src
}
if len(src) == 0 {
return dst
}
m := make(map[string]bool, len(dst))
for _, env := range dst {
m[env.Name] = true
}
for _, env := range src {
if _, ok := m[env.Name]; ok {
continue
}
dst = append(dst, env)
}
return dst
}
func newBaseInitContainer(name string, si *v1.SystemInitialization) corev1.Container {
enablePrivileged := true
initImage := si.InitImage
if initImage == "" {
initImage = DEFAULT_INIT_IMAGE
}
c := corev1.Container{
Image: initImage,
Name: name,
Command: si.Command,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: si.Args,
SecurityContext: &corev1.SecurityContext{
Privileged: &enablePrivileged,
},
}
return c
}
func NewBaseMainContainer(dcr *v1.DorisCluster, config map[string]interface{}, componentType v1.ComponentType) corev1.Container {
command, args := getCommand(componentType)
var spec v1.BaseSpec
var skipInit bool
switch componentType {
case v1.Component_FE:
spec = dcr.Spec.FeSpec.BaseSpec
case v1.Component_BE:
spec = dcr.Spec.BeSpec.BaseSpec
skipInit = dcr.Spec.BeSpec.SkipDefaultSystemInit
case v1.Component_CN:
spec = dcr.Spec.CnSpec.BaseSpec
skipInit = dcr.Spec.BeSpec.SkipDefaultSystemInit
case v1.Component_Broker:
spec = dcr.Spec.BrokerSpec.BaseSpec
default:
}
_, sharedVolumeMounts, sharedPaths := BuildSharedVolumesAndVolumeMounts(dcr.Spec.SharedPersistentVolumeClaims, componentType)
volumeMounts := buildVolumeMounts(spec, sharedPaths, config, componentType)
var envs []corev1.EnvVar
envs = append(envs, buildBaseEnvs(dcr)...)
envs = append(envs, buildKerberosEnv(dcr.Spec.KerberosInfo, config, componentType)...)
envs = mergeEnvs(envs, spec.EnvVars)
if skipInit {
// Only works when the doris version is higher than 2.1.8 or 3.0.4
// When the environment variable SKIP_CHECK_ULIMIT=true is passed in, the start_be.sh will not check system parameters like ulimit and vm.max_map_count etc.
envs = append(envs, corev1.EnvVar{Name: "SKIP_CHECK_ULIMIT", Value: "true"})
}
if len(GetMountConfigMapInfo(spec.ConfigMapInfo)) != 0 {
_, configVolumeMounts := getMultiConfigVolumeAndVolumeMount(&spec.ConfigMapInfo, componentType)
volumeMounts = append(volumeMounts, configVolumeMounts...)
}
if len(spec.Secrets) != 0 {
_, secretVolumeMounts := getMultiSecretVolumeAndVolumeMount(&spec, componentType)
volumeMounts = append(volumeMounts, secretVolumeMounts...)
}
if dcr.Spec.KerberosInfo != nil {
_, kerberosVolumeMounts := getKerberosVolumeAndVolumeMount(dcr.Spec.KerberosInfo)
volumeMounts = append(volumeMounts, kerberosVolumeMounts...)
}
// add basic auth secret volumeMount
if dcr.Spec.AuthSecret != "" {
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: auth_volume_name,
MountPath: basic_auth_path,
})
}
if len(sharedVolumeMounts) != 0 {
volumeMounts = append(volumeMounts, sharedVolumeMounts...)
}
c := corev1.Container{
Image: spec.Image,
Name: string(componentType),
Command: command,
Args: args,
Ports: []corev1.ContainerPort{},
Env: envs,
VolumeMounts: volumeMounts,
ImagePullPolicy: corev1.PullIfNotPresent,
Resources: spec.ResourceRequirements,
}
//livenessPort use heartbeat port for probe service alive.
var livenessPort int32
//readnessPort use http port for confirm the service can provider service to client.
var readnessPort int32
var prestopScript string
var health_api_path string
var liveProbeType ProbeType
var readinessProbeType ProbeType
var commands []string
switch componentType {
case v1.Component_FE:
readnessPort = GetPort(config, HTTP_PORT)
livenessPort = GetPort(config, QUERY_PORT)
liveProbeType = TcpSocket
readinessProbeType = HttpGet
prestopScript = FE_PRESTOP
health_api_path = HEALTH_API_PATH
case v1.Component_BE, v1.Component_CN:
readnessPort = GetPort(config, WEBSERVER_PORT)
livenessPort = GetPort(config, HEARTBEAT_SERVICE_PORT)
liveProbeType = TcpSocket
readinessProbeType = HttpGet
prestopScript = BE_PRESTOP
health_api_path = HEALTH_API_PATH
case v1.Component_Broker:
livenessPort = GetPort(config, BROKER_IPC_PORT)
readnessPort = GetPort(config, BROKER_IPC_PORT)
liveProbeType = Exec
readinessProbeType = Exec
prestopScript = BROKER_PRESTOP
commands = append(commands, HEALTH_BROKER_LIVE_COMMAND, strconv.Itoa(int(livenessPort)))
default:
klog.Infof("the componentType %s is not supported in probe.", componentType)
}
// if tcpSocket the health_api_path will ignore.
c.LivenessProbe = livenessProbe(livenessPort, spec.LiveTimeout, health_api_path, commands, liveProbeType)
// use liveness as startup, when in debugging mode will not be killed
c.StartupProbe = startupProbe(livenessPort, spec.StartTimeout, health_api_path, commands, liveProbeType)
c.ReadinessProbe = readinessProbe(readnessPort, health_api_path, commands, readinessProbeType)
c.Lifecycle = lifeCycle(prestopScript)
return c
}
func buildBaseEnvs(dcr *v1.DorisCluster) []corev1.EnvVar {
defaultEnvs := buildEnvFromPod()
if dcr.Spec.AdminUser != nil {
defaultEnvs = append(defaultEnvs, corev1.EnvVar{
Name: ADMIN_USER,
Value: dcr.Spec.AdminUser.Name,
})
if dcr.Spec.AdminUser.Password != "" {
defaultEnvs = append(defaultEnvs, corev1.EnvVar{
Name: ADMIN_PASSWD,
Value: dcr.Spec.AdminUser.Password,
})
}
} else {
defaultEnvs = append(defaultEnvs, []corev1.EnvVar{{
Name: ADMIN_USER,
Value: DEFAULT_ADMIN_USER,
}, {
Name: DORIS_ROOT_KEY,
Value: DEFAULT_ROOT_PATH,
}}...)
}
return defaultEnvs
}
func buildKerberosEnv(info *v1.KerberosInfo, config map[string]interface{}, componentType v1.ComponentType) []corev1.EnvVar {
if info == nil {
return nil
}
var krb5ConfPath string
switch componentType {
case v1.Component_FE:
krb5ConfPath = kerberos.GetKrb5ConfFromJavaOpts(config)
case v1.Component_BE, v1.Component_CN:
// be config krb5.conf file must set 'kerberos_krb5_conf_path' in be.conf
// https://doris.apache.org/docs/3.0/lakehouse/datalake-analytics/hive?_highlight=kerberos_krb5_conf_path#connect-to-kerberos-enabled-hive
if value, exists := config["kerberos_krb5_conf_path"]; exists {
krb5ConfPath = value.(string)
} else {
krb5ConfPath = kerberos.KRB5_DEFAULT_CONFIG
}
}
keytabFinalUsedPath := keytab_default_mount_path
if info.KeytabPath != "" {
keytabFinalUsedPath = info.KeytabPath
}
return []corev1.EnvVar{
{
Name: KRB5_MOUNT_PATH,
Value: krb5_default_mount_path,
},
{
Name: KRB5_CONFIG,
Value: krb5ConfPath,
},
{
Name: KEYTAB_MOUNT_PATH,
Value: keytab_default_mount_path,
},
{
Name: KEYTAB_FINAL_USED_PATH,
Value: keytabFinalUsedPath,
},
}
}
func buildEnvFromPod() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: POD_NAME,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"},
},
},
{
Name: POD_IP,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.podIP"},
},
},
{
Name: HOST_IP,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"},
},
},
{
Name: POD_NAMESPACE,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"},
},
},
{
Name: config_env_name,
Value: config_env_path,
},
}
}
// GetPodDefaultEnv is currently only used in disaggregated
func GetPodDefaultEnv() []corev1.EnvVar {
return buildEnvFromPod()
}
func getCommand(componentType v1.ComponentType) (commands []string, args []string) {
switch componentType {
case v1.Component_FE:
return []string{"/opt/apache-doris/fe_entrypoint.sh"}, []string{"$(ENV_FE_ADDR)"}
case v1.Component_BE, v1.Component_CN:
return []string{"/opt/apache-doris/be_entrypoint.sh"}, []string{"$(ENV_FE_ADDR)"}
case v1.Component_Broker:
return []string{"/opt/apache-doris/broker_entrypoint.sh"}, []string{"$(ENV_FE_ADDR)"}
default:
klog.Infof("getCommand the componentType %s is not supported.", componentType)
return []string{}, []string{}
}
}
func GeneratePodTemplateName(dcr *v1.DorisCluster, componentType v1.ComponentType) string {
switch componentType {
case v1.Component_FE:
return dcr.Name + "-" + string(v1.Component_FE)
case v1.Component_BE:
return dcr.Name + "-" + string(v1.Component_BE)
case v1.Component_CN:
return dcr.Name + "-" + string(v1.Component_CN)
case v1.Component_Broker:
return dcr.Name + "-" + string(v1.Component_Broker)
default:
return ""
}
}
func getBaseSpecFromCluster(dcr *v1.DorisCluster, componentType v1.ComponentType) *v1.BaseSpec {
var bSpec *v1.BaseSpec
switch componentType {
case v1.Component_FE:
bSpec = &dcr.Spec.FeSpec.BaseSpec
case v1.Component_BE:
bSpec = &dcr.Spec.BeSpec.BaseSpec
case v1.Component_CN:
bSpec = &dcr.Spec.CnSpec.BaseSpec
case v1.Component_Broker:
bSpec = &dcr.Spec.BrokerSpec.BaseSpec
default:
klog.Infof("the componentType %s is not supported!", componentType)
}
return bSpec
}
func getDefaultVolumesVolumeMounts(componentType v1.ComponentType) ([]corev1.Volume, []corev1.VolumeMount) {
switch componentType {
case v1.Component_FE:
return getFeDefaultVolumesVolumeMounts()
case v1.Component_BE, v1.Component_CN:
return getBeDefaultVolumesVolumeMounts()
default:
klog.Infof("GetDefaultVolumesVolumeMountsAndPersistentVolumeClaims componentType %s not supported.", componentType)
return nil, nil
}
}
func getFeDefaultVolumesVolumeMounts() ([]corev1.Volume, []corev1.VolumeMount) {
volumes := []corev1.Volume{
{
Name: fe_meta_name,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
}
volumMounts := []corev1.VolumeMount{
{
Name: fe_meta_name,
MountPath: fe_meta_path,
},
}
return volumes, volumMounts
}
func appendPodInfoVolumesVolumeMounts(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) ([]corev1.Volume, []corev1.VolumeMount) {
if volumes == nil {
var _ []corev1.Volume
}
if volumeMounts == nil {
var _ []corev1.VolumeMount
}
volumes = append(volumes, corev1.Volume{
Name: POD_INFO_VOLUME_NAME,
VolumeSource: corev1.VolumeSource{
DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "labels",
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.labels",
},
}, {
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations",
},
}},
},
},
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: POD_INFO_VOLUME_NAME,
MountPath: POD_INFO_PATH,
})
return volumes, volumeMounts
}
func getBeDefaultVolumesVolumeMounts() ([]corev1.Volume, []corev1.VolumeMount) {
volumes := []corev1.Volume{
{
Name: be_storage_name,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
}
volumeMounts := []corev1.VolumeMount{
{
Name: be_storage_name,
MountPath: be_storage_path,
},
}
return volumes, volumeMounts
}
func getMultiConfigVolumeAndVolumeMount(cmInfo *v1.ConfigMapInfo, componentType v1.ComponentType) ([]corev1.Volume, []corev1.VolumeMount) {
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
if cmInfo == nil {
return volumes, volumeMounts
}
cms := GetMountConfigMapInfo(*cmInfo)
if len(cms) != 0 {
defaultMountPath := ""
switch componentType {
case v1.Component_FE, v1.Component_BE, v1.Component_CN, v1.Component_Broker:
defaultMountPath = config_env_path
default:
klog.Infof("getConfigVolumeAndVolumeMount componentType %s not supported.", componentType)
}
for _, cm := range cms {
path := cm.MountPath
if cm.MountPath == "" {
path = defaultMountPath
}
volumes = append(
volumes,
corev1.Volume{
Name: cm.ConfigMapName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cm.ConfigMapName,
},
},
},
},
)
volumeMounts = append(
volumeMounts,
corev1.VolumeMount{
Name: cm.ConfigMapName,
MountPath: path,
},
)
}
}
return volumes, volumeMounts
}
func getMultiSecretVolumeAndVolumeMount(bSpec *v1.BaseSpec, componentType v1.ComponentType) ([]corev1.Volume, []corev1.VolumeMount) {
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
defaultMountPath := ""
switch componentType {
case v1.Component_FE, v1.Component_BE, v1.Component_CN, v1.Component_Broker:
defaultMountPath = secret_config_path
default:
klog.Infof("getMultiSecretVolumeAndVolumeMount componentType %s not supported.", componentType)
}
for _, secret := range bSpec.Secrets {
path := secret.MountPath
if secret.MountPath == "" {
path = defaultMountPath
}
volumes = append(
volumes,
corev1.Volume{
Name: secret.SecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret.SecretName,
},
},
},
)
volumeMounts = append(
volumeMounts,
corev1.VolumeMount{
Name: secret.SecretName,
MountPath: path,
},
)
}
return volumes, volumeMounts
}
func GetMultiSecretVolumeAndVolumeMountWithCommonSpec(cSpec *dv1.CommonSpec) ([]corev1.Volume, []corev1.VolumeMount) {
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
defaultMountPath := secret_config_path
for _, secret := range cSpec.Secrets {
path := secret.MountPath
if secret.MountPath == "" {
path = defaultMountPath
}
volumes = append(
volumes,
corev1.Volume{
Name: secret.SecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret.SecretName,
},
},
},
)
volumeMounts = append(
volumeMounts,
corev1.VolumeMount{
Name: secret.SecretName,
MountPath: path,
},
)
}
return volumes, volumeMounts
}
func getKerberosVolumeAndVolumeMount(kerberosInfo *v1.KerberosInfo) ([]corev1.Volume, []corev1.VolumeMount) {
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
// krb5
volumes = append(volumes, corev1.Volume{
Name: krb5_volume_name,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: kerberosInfo.Krb5ConfigMap,
},
},
},
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: krb5_volume_name,
MountPath: krb5_default_mount_path,
})
// keytab
volumes = append(volumes, corev1.Volume{
Name: keytab_volume_name,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: kerberosInfo.KeytabSecretName,
},
},
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: keytab_volume_name,
MountPath: keytab_default_mount_path,
})
return volumes, volumeMounts
}
func LivenessProbe(port, timeout int32, path string, commands []string, pt ProbeType) *corev1.Probe {
return livenessProbe(port, timeout, path, commands, pt)
}
func ReadinessProbe(port int32, path string, commands []string, pt ProbeType) *corev1.Probe {
return readinessProbe(port, path, commands, pt)
}
// StartupProbe returns a startup probe.
func startupProbe(port, timeout int32, path string, commands []string, pt ProbeType) *corev1.Probe {
var failurethreshold int32
if timeout < 300 {
timeout = 300
}
failurethreshold = timeout / 5
return &corev1.Probe{
FailureThreshold: failurethreshold,
PeriodSeconds: 5,
ProbeHandler: getProbe(port, path, commands, pt),
}
}
// livenessProbe returns a liveness.
func livenessProbe(port, timeout int32, path string, commands []string, pt ProbeType) *corev1.Probe {
if timeout < 1 {
timeout = 180
}
return &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
// for pulling image and start doris
InitialDelaySeconds: 80,
TimeoutSeconds: timeout,
ProbeHandler: getProbe(port, path, commands, pt),
}
}
// ReadinessProbe returns a readiness probe.
func readinessProbe(port int32, path string, commands []string, pt ProbeType) *corev1.Probe {
return &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
ProbeHandler: getProbe(port, path, commands, pt),
}
}
// LifeCycle returns a lifecycle.
func lifeCycle(preStopScriptPath string) *corev1.Lifecycle {
return &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{preStopScriptPath},
},
},
}
}
func LifeCycleWithPreStopScript(lc *corev1.Lifecycle, preStopScript string) *corev1.Lifecycle {
if lc == nil {
lc = &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{preStopScript},
},
},
}
return lc
}
lc.PreStop = &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{preStopScript},
},
}
return lc
}
// getProbe describe a health check.
func getProbe(port int32, path string, commands []string, pt ProbeType) corev1.ProbeHandler {
switch pt {
case TcpSocket:
return getTcpSocket(port)
case HttpGet:
return getHttpProbe(port, path)
case Exec:
return getExecProbe(commands)
default:
}
return corev1.ProbeHandler{}
}
func getTcpSocket(port int32) corev1.ProbeHandler {
return corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt32(port),
},
}
}
func getHttpProbe(port int32, path string) corev1.ProbeHandler {
var p corev1.ProbeHandler
if path != "" {
p = corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: path,
Port: intstr.IntOrString{
Type: intstr.Int,
IntVal: port,
},
},
}
}
return p
}
func getExecProbe(commands []string) corev1.ProbeHandler {
if len(commands) == 0 {
return corev1.ProbeHandler{}
}
return corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: commands,
},
}
}
func BuildDisaggregatedProbe(container *corev1.Container, cs *dv1.CommonSpec, componentType dv1.DisaggregatedComponentType) {
var failurethreshold int32
startTimeout := int32(300)
liveTimeout := cs.LiveTimeout
if cs.StartTimeout >= 300 {
startTimeout = cs.StartTimeout
}
failurethreshold = startTimeout / 5
if liveTimeout < 1 {
liveTimeout = 180
}
var commend string
switch componentType {
case dv1.DisaggregatedFE:
commend = HEALTH_DISAGGREGATED_FE_PROBE_COMMAND
case dv1.DisaggregatedBE:
commend = HEALTH_DISAGGREGATED_BE_PROBE_COMMAND
case dv1.DisaggregatedMS:
commend = HEALTH_DISAGGREGATED_MS_PROBE_COMMAND
default:
}
// check running status
alive := corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{commend, DISAGGREGATED_LIVE_PARAM_ALIVE},
},
}
// check ready status
ready := corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{commend, DISAGGREGATED_LIVE_PARAM_READY},
},
}
container.LivenessProbe = &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
InitialDelaySeconds: 80,
TimeoutSeconds: liveTimeout,
ProbeHandler: alive,
}
container.StartupProbe = &corev1.Probe{
FailureThreshold: failurethreshold,
PeriodSeconds: 5,
ProbeHandler: alive,
}
container.ReadinessProbe = &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
ProbeHandler: ready,
}
}
func getDefaultAffinity(componentType v1.ComponentType) *corev1.Affinity {
// default Affinity rule is :
// Pods of the same component should deploy on different hosts with Preferred scheduling.
// weight is 20, weight range is 1-100
podAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 20,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{Key: v1.ComponentLabelKey, Operator: metav1.LabelSelectorOpIn, Values: []string{string(componentType)}},
},
},
TopologyKey: NODE_TOPOLOGYKEY,
},
}
return &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{podAffinityTerm},
},
}
}
func constructAffinity(dcrAffinity *corev1.Affinity, componentType v1.ComponentType) *corev1.Affinity {
affinity := getDefaultAffinity(componentType)
if dcrAffinity == nil {
return affinity
}
dcrPodAntiAffinity := dcrAffinity.PodAntiAffinity
if dcrPodAntiAffinity != nil {
affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = dcrPodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, dcrPodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
}
affinity.NodeAffinity = dcrAffinity.NodeAffinity
affinity.PodAffinity = dcrAffinity.PodAffinity
return affinity
}
func constructBeDefaultInitContainer(defaultImage string) corev1.Container {
return newBaseInitContainer(
"default-init",
&v1.SystemInitialization{
Command: []string{"/bin/sh"},
InitImage: defaultImage,
Args: []string{"-c", "sysctl -w vm.max_map_count=2000000 && swapoff -a"},
},
)
}