pkg/common/utils/resource/service.go (343 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package resource import ( "strings" dv1 "github.com/apache/doris-operator/api/disaggregated/v1" v1 "github.com/apache/doris-operator/api/doris/v1" "github.com/apache/doris-operator/pkg/common/utils/hash" "github.com/apache/doris-operator/pkg/common/utils/set" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" ) // HashService service hash components type hashService struct { name string namespace string ports []corev1.ServicePort selector map[string]string //deal with external access load balancer. serviceType corev1.ServiceType labels map[string]string annotations map[string]string } func BuildInternalService(dcr *v1.DorisCluster, componentType v1.ComponentType, config map[string]interface{}) corev1.Service { labels := v1.GenerateInternalServiceLabels(dcr, componentType) selector := v1.GenerateServiceSelector(dcr, componentType) //the k8s service type. return corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: v1.GenerateInternalCommunicateServiceName(dcr, componentType), Namespace: dcr.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{GetOwnerReference(dcr)}, }, Spec: corev1.ServiceSpec{ ClusterIP: "None", Ports: []corev1.ServicePort{ getInternalServicePort(config, componentType), }, Selector: selector, //value = true, Pod don't need to become ready that be search by domain. PublishNotReadyAddresses: true, }, } } func getInternalServicePort(config map[string]interface{}, componentType v1.ComponentType) corev1.ServicePort { switch componentType { case v1.Component_FE: return corev1.ServicePort{ Name: GetPortKey(QUERY_PORT), Port: GetPort(config, QUERY_PORT), TargetPort: intstr.FromInt32(GetPort(config, QUERY_PORT)), } case v1.Component_BE, v1.Component_CN: return corev1.ServicePort{ Name: GetPortKey(HEARTBEAT_SERVICE_PORT), Port: GetPort(config, HEARTBEAT_SERVICE_PORT), TargetPort: intstr.FromInt32(GetPort(config, HEARTBEAT_SERVICE_PORT)), } case v1.Component_Broker: return corev1.ServicePort{ Name: GetPortKey(BROKER_IPC_PORT), Port: GetPort(config, BROKER_IPC_PORT), TargetPort: intstr.FromInt32(GetPort(config, BROKER_IPC_PORT)), } default: klog.Infof("getInternalServicePort not supported the type %s", componentType) return corev1.ServicePort{} } } // BuildExternalService build the external service. not have selector func BuildExternalService(dcr *v1.DorisCluster, componentType v1.ComponentType, config map[string]interface{}) corev1.Service { labels := v1.GenerateExternalServiceLabels(dcr, componentType) selector := v1.GenerateServiceSelector(dcr, componentType) //the k8s service type. var ports []corev1.ServicePort var exportService *v1.ExportService switch componentType { case v1.Component_FE: exportService = dcr.Spec.FeSpec.Service ports = getFeServicePorts(config) case v1.Component_BE: //cn is be, but for user we should make them clear for ability recognition exportService = dcr.Spec.BeSpec.Service ports = getBeServicePorts(config) case v1.Component_CN: exportService = dcr.Spec.CnSpec.Service ports = getBeServicePorts(config) default: klog.Infof("BuildExternalService componentType %s not supported.", componentType) } svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: v1.GenerateExternalServiceName(dcr, componentType), Namespace: dcr.Namespace, Labels: labels, }, } constructServiceSpec(exportService, &svc, selector, ports) if exportService != nil { svc.Annotations = exportService.Annotations } svc.OwnerReferences = []metav1.OwnerReference{GetOwnerReference(dcr)} // the code is invalid, duplicate with ServiceDeepEqual //hso := serviceHashObject(&svc) //anno := map[string]string{} //anno[v1.ComponentResourceHash] = hash.HashObject(hso) //svc.Annotations = anno return svc } func constructServiceSpec(exportService *v1.ExportService, svc *corev1.Service, selector map[string]string, ports []corev1.ServicePort) { var exportPorts []v1.DorisServicePort if exportService != nil { exportPorts = exportService.ServicePorts } for _, ep := range exportPorts { for i, _ := range ports { if int(ep.TargetPort) == ports[i].TargetPort.IntValue() { ports[i].NodePort = ep.NodePort } } } svc.Spec = corev1.ServiceSpec{ Selector: selector, Ports: ports, SessionAffinity: corev1.ServiceAffinityClientIP, } // The external load balancer provided by the cloud provider may cause the client IP received by the service to change. if exportService != nil && exportService.Type == corev1.ServiceTypeLoadBalancer { svc.Spec.SessionAffinity = corev1.ServiceAffinityNone } setServiceType(exportService, svc) } func getFeServicePorts(config map[string]interface{}) (ports []corev1.ServicePort) { httpPort := GetPort(config, HTTP_PORT) rpcPort := GetPort(config, RPC_PORT) queryPort := GetPort(config, QUERY_PORT) editPort := GetPort(config, EDIT_LOG_PORT) arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) ports = append(ports, corev1.ServicePort{ Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: GetPortKey(HTTP_PORT), }, corev1.ServicePort{ Port: rpcPort, TargetPort: intstr.FromInt32(rpcPort), Name: GetPortKey(RPC_PORT), }, corev1.ServicePort{ Port: queryPort, TargetPort: intstr.FromInt32(queryPort), Name: GetPortKey(QUERY_PORT), }, corev1.ServicePort{ Port: editPort, TargetPort: intstr.FromInt32(editPort), Name: GetPortKey(EDIT_LOG_PORT), }) if arrowFlightPort != -1 { ports = append(ports, corev1.ServicePort{ Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), }) } return } func getBeServicePorts(config map[string]interface{}) (ports []corev1.ServicePort) { bePort := GetPort(config, BE_PORT) webseverPort := GetPort(config, WEBSERVER_PORT) heartPort := GetPort(config, HEARTBEAT_SERVICE_PORT) brpcPort := GetPort(config, BRPC_PORT) arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) ports = append(ports, corev1.ServicePort{ Port: bePort, TargetPort: intstr.FromInt32(bePort), Name: GetPortKey(BE_PORT), }, corev1.ServicePort{ Port: webseverPort, TargetPort: intstr.FromInt32(webseverPort), Name: GetPortKey(WEBSERVER_PORT), }, corev1.ServicePort{ Port: heartPort, TargetPort: intstr.FromInt32(heartPort), Name: GetPortKey(HEARTBEAT_SERVICE_PORT), }, corev1.ServicePort{ Port: brpcPort, TargetPort: intstr.FromInt32(brpcPort), Name: GetPortKey(BRPC_PORT), }) if arrowFlightPort != -1 { ports = append(ports, corev1.ServicePort{ Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), }) } return } func GetContainerPorts(config map[string]interface{}, componentType v1.ComponentType) []corev1.ContainerPort { switch componentType { case v1.Component_FE: return getFeContainerPorts(config) case v1.Component_BE: return getBeContainerPorts(config) case v1.Component_Broker: return getBrokerContainerPorts(config) default: klog.Infof("GetContainerPorts the componentType %s not supported.", componentType) return []corev1.ContainerPort{} } } func GetDisaggregatedContainerPorts(config map[string]interface{}, componentType dv1.DisaggregatedComponentType) []corev1.ContainerPort { switch componentType { case dv1.DisaggregatedFE: return getFeContainerPorts(config) case dv1.DisaggregatedBE: return getBeContainerPorts(config) case dv1.DisaggregatedMS: return getMetaServiceContainerPorts(config) default: return nil } } func getMetaServiceContainerPorts(config map[string]interface{}) []corev1.ContainerPort { return []corev1.ContainerPort{ { Name: GetPortKey(BROKER_IPC_PORT), ContainerPort: GetPort(config, BROKER_IPC_PORT), }, } } func getFeContainerPorts(config map[string]interface{}) []corev1.ContainerPort { ports := []corev1.ContainerPort{ { Name: GetPortKey(HTTP_PORT), ContainerPort: GetPort(config, HTTP_PORT), Protocol: corev1.ProtocolTCP, }, { Name: GetPortKey(RPC_PORT), ContainerPort: GetPort(config, RPC_PORT), Protocol: corev1.ProtocolTCP, }, { Name: GetPortKey(QUERY_PORT), ContainerPort: GetPort(config, QUERY_PORT), Protocol: corev1.ProtocolTCP, }, { Name: GetPortKey(EDIT_LOG_PORT), ContainerPort: GetPort(config, EDIT_LOG_PORT), Protocol: corev1.ProtocolTCP, }, } arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) if arrowFlightPort != -1 { ports = append(ports, corev1.ContainerPort{ Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), ContainerPort: arrowFlightPort, Protocol: corev1.ProtocolTCP, }) } return ports } func getBeContainerPorts(config map[string]interface{}) []corev1.ContainerPort { ports := []corev1.ContainerPort{ { Name: GetPortKey(BE_PORT), ContainerPort: GetPort(config, BE_PORT), }, { Name: GetPortKey(WEBSERVER_PORT), ContainerPort: GetPort(config, WEBSERVER_PORT), Protocol: corev1.ProtocolTCP, }, { Name: GetPortKey(HEARTBEAT_SERVICE_PORT), ContainerPort: GetPort(config, HEARTBEAT_SERVICE_PORT), Protocol: corev1.ProtocolTCP, }, { Name: GetPortKey(BRPC_PORT), ContainerPort: GetPort(config, BRPC_PORT), Protocol: corev1.ProtocolTCP, }, } arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) if arrowFlightPort != -1 { ports = append(ports, corev1.ContainerPort{ Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), ContainerPort: arrowFlightPort, Protocol: corev1.ProtocolTCP, }) } return ports } func getBrokerContainerPorts(config map[string]interface{}) []corev1.ContainerPort { return []corev1.ContainerPort{ { Name: GetPortKey(BROKER_IPC_PORT), ContainerPort: GetPort(config, BROKER_IPC_PORT), }, } } func GetPortKey(configKey string) string { switch configKey { case BE_PORT: return strings.ReplaceAll(BE_PORT, "_", "-") case WEBSERVER_PORT: return strings.ReplaceAll(WEBSERVER_PORT, "_", "-") case HEARTBEAT_SERVICE_PORT: return "heartbeat-port" case BRPC_PORT: return strings.ReplaceAll(BRPC_PORT, "_", "-") case HTTP_PORT: return strings.ReplaceAll(HTTP_PORT, "_", "-") case QUERY_PORT: return strings.ReplaceAll(QUERY_PORT, "_", "-") case RPC_PORT: return strings.ReplaceAll(RPC_PORT, "_", "-") case EDIT_LOG_PORT: return strings.ReplaceAll(EDIT_LOG_PORT, "_", "-") case BROKER_IPC_PORT: return strings.ReplaceAll(BROKER_IPC_PORT, "_", "-") case BRPC_LISTEN_PORT: return "brpc-port" case ARROW_FLIGHT_SQL_PORT: return "arrow-flight" default: return "" } } func setServiceType(svc *v1.ExportService, service *corev1.Service) { service.Spec.Type = corev1.ServiceTypeClusterIP if svc != nil && svc.Type != "" { service.Spec.Type = svc.Type } if service.Spec.Type == corev1.ServiceTypeLoadBalancer && svc.LoadBalancerIP != "" { service.Spec.LoadBalancerIP = svc.LoadBalancerIP } } func ServiceDeepEqual(newSvc, oldSvc *corev1.Service) bool { return ServiceDeepEqualWithAnnoKey(newSvc, oldSvc, v1.ComponentResourceHash) } func ServiceDeepEqualWithAnnoKey(nsvc, osvc *corev1.Service, annoKey string) bool { if annoKey == "" { annoKey = v1.ComponentResourceHash } var newHashValue, oldHashValue string if _, ok := nsvc.Annotations[annoKey]; ok { newHashValue = nsvc.Annotations[annoKey] } else { newHashService := serviceHashObject(nsvc, set.NewSetString(annoKey)) newHashValue = hash.HashObject(newHashService) } if _, ok := osvc.Annotations[annoKey]; ok { oldHashValue = osvc.Annotations[annoKey] } else { oldHashService := serviceHashObject(osvc, set.NewSetString(annoKey)) oldHashValue = hash.HashObject(oldHashService) } // set hash value in annotation for avoiding deep equal. nsvc.Annotations = mergeMaps(nsvc.Annotations, map[string]string{annoKey: newHashValue}) return newHashValue == oldHashValue && nsvc.Namespace == osvc.Namespace } // hash service for diff new generate service and old service in kubernetes. func serviceHashObject(svc *corev1.Service, avoidAnnoKeys *set.SetString) hashService { annos := make(map[string]string, len(svc.Annotations)) //for support service annotations, avoid hash value in annotations interfere equal comparison. for key, value := range svc.Annotations { if ok := avoidAnnoKeys.Find(key); ok { continue } annos[key] = value } return hashService{ name: svc.Name, namespace: svc.Namespace, ports: svc.Spec.Ports, selector: svc.Spec.Selector, serviceType: svc.Spec.Type, labels: svc.Labels, annotations: annos, } }