pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go (71 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package computegroups import ( dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/resource" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" ) func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service { uniqueId := cg.UniqueId svcConf := cg.CommonSpec.Service sps := newComputeServicePorts(cvs, svcConf) svc := dcgs.NewDefaultService(ddc) ob := &svc.ObjectMeta ob.Name = ddc.GetCGServiceName(cg) ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId) spec := &svc.Spec spec.Selector = dcgs.newCGPodsSelector(ddc.Name, uniqueId) spec.Ports = sps if svcConf != nil && svcConf.Type != "" { svc.Spec.Type = svcConf.Type } if svcConf != nil { svc.Annotations = svcConf.Annotations } // The external load balancer provided by the cloud provider may cause the client IP received by the service to change. if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { svc.Spec.SessionAffinity = corev1.ServiceAffinityNone } return svc } // new ports by start config that mounted into container by configMap. func newComputeServicePorts(cvs map[string]interface{}, svcConf *dv1.ExportService) []corev1.ServicePort { bePort := resource.GetPort(cvs, resource.BE_PORT) webserverPort := resource.GetPort(cvs, resource.WEBSERVER_PORT) heartbeatPort := resource.GetPort(cvs, resource.HEARTBEAT_SERVICE_PORT) brpcPort := resource.GetPort(cvs, resource.BRPC_PORT) arrowFlightPort := resource.GetPort(cvs, resource.ARROW_FLIGHT_SQL_PORT) sps := []corev1.ServicePort{{ Name: resource.GetPortKey(resource.BE_PORT), TargetPort: intstr.FromInt32(bePort), Port: bePort, }, { Name: resource.GetPortKey(resource.WEBSERVER_PORT), TargetPort: intstr.FromInt32(webserverPort), Port: webserverPort, }, { Name: resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT), TargetPort: intstr.FromInt32(heartbeatPort), Port: heartbeatPort, }, { Name: resource.GetPortKey(resource.BRPC_PORT), TargetPort: intstr.FromInt32(brpcPort), Port: brpcPort, }} if arrowFlightPort != -1 { sps = append(sps, corev1.ServicePort{ Name: resource.GetPortKey(resource.ARROW_FLIGHT_SQL_PORT), TargetPort: intstr.FromInt32(arrowFlightPort), Port: arrowFlightPort, }) } if svcConf == nil || svcConf.Type != corev1.ServiceTypeNodePort { return sps } for i, _ := range sps { for j, _ := range svcConf.PortMaps { if sps[i].Port == svcConf.PortMaps[j].TargetPort { sps[i].NodePort = svcConf.PortMaps[j].NodePort } } } return sps }