pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go (149 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package computegroups
import (
"fmt"
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/resource"
sub "github.com/apache/doris-operator/pkg/controller/sub_controller"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"strconv"
)
const (
basic_auth_path = "/etc/basic_auth"
auth_volume_name = "basic-auth"
)
// generate statefulset or service labels
func (dcgs *DisaggregatedComputeGroupsController) newCG2LayerSchedulerLabels(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
labels := dcgs.GetCG2LayerCommonSchedulerLabels(ddcName)
labels[dv1.DorisDisaggregatedComputeGroupUniqueId] = uniqueId
return labels
}
func (dcgs *DisaggregatedComputeGroupsController) GetCG2LayerCommonSchedulerLabels(ddcName string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedOwnerReference: ddcName,
}
}
func (dcgs *DisaggregatedComputeGroupsController) newCGPodsSelector(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedComputeGroupUniqueId: uniqueId,
dv1.DorisDisaggregatedPodType: "compute",
}
}
func (dcgs *DisaggregatedComputeGroupsController) NewStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *appv1.StatefulSet {
st := dcgs.NewDefaultStatefulset(ddc)
uniqueId := cg.UniqueId
matchLabels := dcgs.newCGPodsSelector(ddc.Name, uniqueId)
//build metadata
func() {
st.Name = ddc.GetCGStatefulsetName(cg)
st.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)
}()
// build st.spec
func() {
st.Spec.Selector = &metav1.LabelSelector{
MatchLabels: matchLabels,
}
_, _, vcts := dcgs.BuildVolumesVolumeMountsAndPVCs(cvs, dv1.DisaggregatedBE, &cg.CommonSpec)
st.Spec.Replicas = cg.Replicas
st.Spec.VolumeClaimTemplates = vcts
st.Spec.ServiceName = ddc.GetCGServiceName(cg)
pts := dcgs.NewPodTemplateSpec(ddc, matchLabels, cvs, cg)
st.Spec.Template = pts
}()
return st
}
func (dcgs *DisaggregatedComputeGroupsController) NewPodTemplateSpec(ddc *dv1.DorisDisaggregatedCluster, selector map[string]string, cvs map[string]interface{}, cg *dv1.ComputeGroup) corev1.PodTemplateSpec {
pts := resource.NewPodTemplateSpecWithCommonSpec(&cg.CommonSpec, dv1.DisaggregatedBE)
//pod template metadata.
func() {
l := (resource.Labels)(selector)
l.AddLabel(pts.Labels)
pts.Labels = l
}()
c := dcgs.NewCGContainer(ddc, cvs, cg)
pts.Spec.Containers = append(pts.Spec.Containers, c)
vs, _, _ := dcgs.BuildVolumesVolumeMountsAndPVCs(cvs, dv1.DisaggregatedBE, &cg.CommonSpec)
configVolumes, _ := dcgs.BuildDefaultConfigMapVolumesVolumeMounts(cg.ConfigMaps)
pts.Spec.Volumes = append(pts.Spec.Volumes, configVolumes...)
pts.Spec.Volumes = append(pts.Spec.Volumes, vs...)
if ddc.Spec.AuthSecret != "" {
pts.Spec.Volumes = append(pts.Spec.Volumes, corev1.Volume{
Name: auth_volume_name,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: ddc.Spec.AuthSecret,
},
},
})
}
if len(cg.Secrets) != 0 {
secretVolumes, _ := resource.GetMultiSecretVolumeAndVolumeMountWithCommonSpec(&cg.CommonSpec)
pts.Spec.Volumes = append(pts.Spec.Volumes, secretVolumes...)
}
cgUniqueId := selector[dv1.DorisDisaggregatedComputeGroupUniqueId]
pts.Spec.Affinity = dcgs.ConstructDefaultAffinity(dv1.DorisDisaggregatedComputeGroupUniqueId, cgUniqueId, pts.Spec.Affinity)
return pts
}
func (dcgs *DisaggregatedComputeGroupsController) NewCGContainer(ddc *dv1.DorisDisaggregatedCluster, cvs map[string]interface{}, cg *dv1.ComputeGroup) corev1.Container {
if cg.EnableWorkloadGroup {
if cg.ContainerSecurityContext == nil {
cg.ContainerSecurityContext = &corev1.SecurityContext{}
}
cg.ContainerSecurityContext.Privileged = pointer.Bool(true)
}
c := resource.NewContainerWithCommonSpec(&cg.CommonSpec)
c.Lifecycle = resource.LifeCycleWithPreStopScript(c.Lifecycle, sub.GetDisaggregatedPreStopScript(dv1.DisaggregatedBE))
cmd, args := sub.GetDisaggregatedCommand(dv1.DisaggregatedBE)
c.Command = cmd
c.Args = args
c.Name = "compute"
c.Ports = resource.GetDisaggregatedContainerPorts(cvs, dv1.DisaggregatedBE)
c.Env = cg.CommonSpec.EnvVars
c.Env = append(c.Env, resource.GetPodDefaultEnv()...)
c.Env = append(c.Env, dcgs.newSpecificEnvs(ddc, cg)...)
resource.BuildDisaggregatedProbe(&c, &cg.CommonSpec, dv1.DisaggregatedBE)
_, vms, _ := dcgs.BuildVolumesVolumeMountsAndPVCs(cvs, dv1.DisaggregatedBE, &cg.CommonSpec)
_, cmvms := dcgs.BuildDefaultConfigMapVolumesVolumeMounts(cg.ConfigMaps)
c.VolumeMounts = vms
if c.VolumeMounts == nil {
c.VolumeMounts = cmvms
} else {
c.VolumeMounts = append(c.VolumeMounts, cmvms...)
}
// add basic auth secret volumeMount
if ddc.Spec.AuthSecret != "" {
c.VolumeMounts = append(c.VolumeMounts, corev1.VolumeMount{
Name: auth_volume_name,
MountPath: basic_auth_path,
})
}
if len(cg.Secrets) != 0 {
_, secretVolumeMounts := resource.GetMultiSecretVolumeAndVolumeMountWithCommonSpec(&cg.CommonSpec)
c.VolumeMounts = append(c.VolumeMounts, secretVolumeMounts...)
}
return c
}
// add specific envs for be, the env will used by be_disaggregated_entrypoint script.
func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) []corev1.EnvVar {
var cgEnvs []corev1.EnvVar
stsName := ddc.GetCGStatefulsetName(cg)
//get fe config for find query port
confMap := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.FE_RESOLVEKEY, ddc.Spec.FeSpec.ConfigMaps)
fqp := resource.GetPort(confMap, resource.QUERY_PORT)
fqpStr := strconv.FormatInt(int64(fqp), 10)
//use fe service name as access address.
feAddr := ddc.GetFEVIPAddresss()
cgEnvs = append(cgEnvs,
corev1.EnvVar{Name: resource.STATEFULSET_NAME, Value: stsName},
corev1.EnvVar{Name: resource.COMPUTE_GROUP_NAME, Value: ddc.GetCGName(cg)},
corev1.EnvVar{Name: resource.ENV_FE_ADDR, Value: feAddr},
corev1.EnvVar{Name: resource.ENV_FE_PORT, Value: fqpStr})
// add user and password envs
if ddc.Spec.AdminUser != nil {
cgEnvs = append(cgEnvs,
corev1.EnvVar{Name: resource.ADMIN_USER, Value: ddc.Spec.AdminUser.Name},
corev1.EnvVar{Name: resource.ADMIN_PASSWD, Value: ddc.Spec.AdminUser.Password},
)
}
if cg.EnableWorkloadGroup {
cgEnvs = append(cgEnvs,
corev1.EnvVar{Name: resource.ENABLE_WORKLOAD_GROUP, Value: fmt.Sprintf("%t", cg.EnableWorkloadGroup)},
)
}
return cgEnvs
}