pkg/controller/sub_controller/disaggregated_cluster/metaservice/statefulset.go (141 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package metaservice
import (
"context"
"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/metadata"
"github.com/apache/doris-operator/pkg/common/utils/resource"
sc "github.com/apache/doris-operator/pkg/controller/sub_controller"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
defaultLogPrefixName = "log"
fdbClusterFileKey = "cluster-file"
//DefaultStorageSize int64 = 107374182400
)
func (dms *DisaggregatedMSController) newMSPodsSelector(ddcName string) map[string]string {
return map[string]string{
v1.DorisDisaggregatedClusterName: ddcName,
v1.DorisDisaggregatedPodType: "ms",
v1.DorisDisaggregatedOwnerReference: ddcName,
}
}
func (dms *DisaggregatedMSController) newMSSchedulerLabels(ddcName string) map[string]string {
return map[string]string{
v1.DorisDisaggregatedClusterName: ddcName,
v1.DorisDisaggregatedPodType: "ms",
}
}
func (dms *DisaggregatedMSController) newStatefulset(ddc *v1.DorisDisaggregatedCluster, confMap map[string]interface{}) *appv1.StatefulSet {
st := dms.NewDefaultStatefulset(ddc)
func() {
st.Name = ddc.GetMSStatefulsetName()
st.Labels = dms.newMSSchedulerLabels(ddc.Name)
}()
msSpec := ddc.Spec.MetaService
matchLabels := dms.newMSPodsSelector(ddc.Name)
var volumeClaimTemplates []corev1.PersistentVolumeClaim
cpv := msSpec.PersistentVolume
if cpv != nil {
pvc := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: defaultLogPrefixName,
Annotations: resource.NewAnnotations(),
},
Spec: cpv.PersistentVolumeClaimSpec,
}
volumeClaimTemplates = append(volumeClaimTemplates, pvc)
}
replicas := metadata.GetInt32Pointer(v1.DefaultMetaserviceNumber)
if msSpec.Replicas != nil {
replicas = msSpec.Replicas
}
func() {
st.Spec.Replicas = replicas
st.Spec.Selector = &metav1.LabelSelector{
MatchLabels: matchLabels,
}
st.Spec.Template = dms.NewPodTemplateSpec(ddc, matchLabels, confMap)
st.Spec.ServiceName = ddc.GetMSServiceName()
st.Spec.VolumeClaimTemplates = volumeClaimTemplates
}()
return st
}
func (dms *DisaggregatedMSController) NewPodTemplateSpec(ddc *v1.DorisDisaggregatedCluster, selector map[string]string, confMap map[string]interface{}) corev1.PodTemplateSpec {
pts := resource.NewPodTemplateSpecWithCommonSpec(&ddc.Spec.MetaService.CommonSpec, v1.DisaggregatedMS)
//pod template metadata.
func() {
l := (resource.Labels)(selector)
l.AddLabel(pts.Labels)
pts.Labels = l
}()
c := dms.NewMSContainer(ddc, confMap)
pts.Spec.Containers = append(pts.Spec.Containers, c)
vs, _, _ := dms.BuildVolumesVolumeMountsAndPVCs(confMap, v1.DisaggregatedMS, &ddc.Spec.MetaService.CommonSpec)
configVolumes, _ := dms.BuildDefaultConfigMapVolumesVolumeMounts(ddc.Spec.MetaService.ConfigMaps)
pts.Spec.Volumes = append(pts.Spec.Volumes, vs...)
pts.Spec.Volumes = append(pts.Spec.Volumes, configVolumes...)
pts.Spec.Affinity = dms.ConstructDefaultAffinity(v1.DorisDisaggregatedClusterName, selector[v1.DorisDisaggregatedClusterName], ddc.Spec.MetaService.Affinity)
if len(ddc.Spec.MetaService.Secrets) != 0 {
secretVolumes, _ := resource.GetMultiSecretVolumeAndVolumeMountWithCommonSpec(&ddc.Spec.MetaService.CommonSpec)
pts.Spec.Volumes = append(pts.Spec.Volumes, secretVolumes...)
}
return pts
}
func (dms *DisaggregatedMSController) NewMSContainer(ddc *v1.DorisDisaggregatedCluster, cvs map[string]interface{}) corev1.Container {
c := resource.NewContainerWithCommonSpec(&ddc.Spec.MetaService.CommonSpec)
c.Lifecycle = resource.LifeCycleWithPreStopScript(c.Lifecycle, sc.GetDisaggregatedPreStopScript(v1.DisaggregatedMS))
cmd, args := sc.GetDisaggregatedCommand(v1.DisaggregatedMS)
c.Command = cmd
c.Args = args
c.Name = "metaservice"
c.Ports = resource.GetDisaggregatedContainerPorts(cvs, v1.DisaggregatedMS)
c.Env = ddc.Spec.MetaService.CommonSpec.EnvVars
c.Env = append(c.Env, resource.GetPodDefaultEnv()...)
c.Env = append(c.Env, dms.newSpecificEnvs(ddc)...)
resource.BuildDisaggregatedProbe(&c, &ddc.Spec.MetaService.CommonSpec, v1.DisaggregatedMS)
_, vms, _ := dms.BuildVolumesVolumeMountsAndPVCs(cvs, v1.DisaggregatedMS, &ddc.Spec.MetaService.CommonSpec)
_, cmvms := dms.BuildDefaultConfigMapVolumesVolumeMounts(ddc.Spec.MetaService.ConfigMaps)
c.VolumeMounts = vms
if c.VolumeMounts == nil {
c.VolumeMounts = cmvms
} else {
c.VolumeMounts = append(c.VolumeMounts, cmvms...)
}
if len(ddc.Spec.MetaService.Secrets) != 0 {
_, secretVolumeMounts := resource.GetMultiSecretVolumeAndVolumeMountWithCommonSpec(&ddc.Spec.MetaService.CommonSpec)
c.VolumeMounts = append(c.VolumeMounts, secretVolumeMounts...)
}
return c
}
func (dms *DisaggregatedMSController) newSpecificEnvs(ddc *v1.DorisDisaggregatedCluster) []corev1.EnvVar {
msSpec := ddc.Spec.MetaService
if msSpec.FDB.Address == "" && (msSpec.FDB.ConfigMapNamespaceName.Namespace == "" || msSpec.FDB.ConfigMapNamespaceName.Name == "") {
dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.FDBAddressNotConfiged), "fdb not configed in spec")
return nil
}
var fdbEndpoint string
if msSpec.FDB.ConfigMapNamespaceName.Namespace != "" && msSpec.FDB.ConfigMapNamespaceName.Name != "" {
cm, err := k8s.GetConfigMap(context.Background(), dms.K8sclient, msSpec.FDB.ConfigMapNamespaceName.Namespace, msSpec.FDB.ConfigMapNamespaceName.Name)
if err != nil {
dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.FDBAddressNotConfiged), "configmap "+"namespace"+msSpec.FDB.ConfigMapNamespaceName.Namespace+" name "+msSpec.FDB.ConfigMapNamespaceName.Name+" find failed "+err.Error())
return nil
}
if cm.Data == nil {
dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.FDBAddressNotConfiged), "configmap "+"namespace"+msSpec.FDB.ConfigMapNamespaceName.Namespace+" name "+msSpec.FDB.ConfigMapNamespaceName.Name+" not have data.")
return nil
}
if _, ok := cm.Data[fdbClusterFileKey]; !ok {
dms.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.FDBAddressNotConfiged), "configmap "+"namespace"+msSpec.FDB.ConfigMapNamespaceName.Namespace+" name "+msSpec.FDB.ConfigMapNamespaceName.Name+" not have cluster-file")
return nil
}
fdbEndpoint = cm.Data[fdbClusterFileKey]
}
if msSpec.FDB.Address != "" {
fdbEndpoint = msSpec.FDB.Address
}
return []corev1.EnvVar{{
Name: resource.FDB_ENDPOINT,
Value: fdbEndpoint,
}}
}