func()

in pkg/ingress/config/ingress_config.go [731:844]


func (m *IngressConfig) convertDestinationRule(configs []common.WrapperConfig) []config.Config {
	convertOptions := common.ConvertOptions{
		Service2TrafficPolicy: map[common.ServiceKey]*common.WrapperTrafficPolicy{},
	}

	// Convert destination from service within ingress rule.
	for idx := range configs {
		cfg := configs[idx]
		clusterId := common.GetClusterId(cfg.Config.Annotations)
		m.mutex.RLock()
		ingressController := m.remoteIngressControllers[clusterId]
		m.mutex.RUnlock()
		if ingressController == nil {
			continue
		}
		if err := ingressController.ConvertTrafficPolicy(&convertOptions, &cfg); err != nil {
			IngressLog.Errorf("Convert ingress %s/%s to destination rule fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err)
		}
	}

	IngressLog.Debugf("traffic policy number %d", len(convertOptions.Service2TrafficPolicy))

	for _, wrapperTrafficPolicy := range convertOptions.Service2TrafficPolicy {
		m.annotationHandler.ApplyTrafficPolicy(wrapperTrafficPolicy.TrafficPolicy, wrapperTrafficPolicy.PortTrafficPolicy, wrapperTrafficPolicy.WrapperConfig.AnnotationsConfig)
	}

	// Merge multi-port traffic policy per service into one destination rule.
	destinationRules := map[string]*common.WrapperDestinationRule{}
	for key, wrapperTrafficPolicy := range convertOptions.Service2TrafficPolicy {
		var serviceName string
		if key.ServiceFQDN != "" {
			serviceName = key.ServiceFQDN
		} else {
			serviceName = util.CreateServiceFQDN(key.Namespace, key.Name)
		}
		dr, exist := destinationRules[serviceName]
		if !exist {
			trafficPolicy := &networking.TrafficPolicy{}
			if wrapperTrafficPolicy.PortTrafficPolicy != nil {
				trafficPolicy.PortLevelSettings = []*networking.TrafficPolicy_PortTrafficPolicy{wrapperTrafficPolicy.PortTrafficPolicy}
			} else if wrapperTrafficPolicy.TrafficPolicy != nil {
				trafficPolicy = wrapperTrafficPolicy.TrafficPolicy
			}
			dr = &common.WrapperDestinationRule{
				DestinationRule: &networking.DestinationRule{
					Host:          serviceName,
					TrafficPolicy: trafficPolicy,
				},
				WrapperConfig: wrapperTrafficPolicy.WrapperConfig,
				ServiceKey:    key,
			}
		} else if wrapperTrafficPolicy.PortTrafficPolicy != nil {
			dr.DestinationRule.TrafficPolicy.PortLevelSettings = append(dr.DestinationRule.TrafficPolicy.PortLevelSettings, wrapperTrafficPolicy.PortTrafficPolicy)
		}

		destinationRules[serviceName] = dr
	}

	if m.RegistryReconciler != nil {
		drws := m.RegistryReconciler.GetAllDestinationRuleWrapper()
		for _, destinationRuleWrapper := range drws {
			serviceName := destinationRuleWrapper.ServiceKey.ServiceFQDN
			dr, exist := destinationRules[serviceName]
			if !exist {
				destinationRules[serviceName] = destinationRuleWrapper
			} else if dr.DestinationRule.TrafficPolicy != nil {
				if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil &&
					destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil {
					dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer
				}
				portTrafficPolicy := destinationRuleWrapper.DestinationRule.TrafficPolicy.PortLevelSettings[0]
				portUpdated := false
				for _, policy := range dr.DestinationRule.TrafficPolicy.PortLevelSettings {
					if policy.Port.Number == portTrafficPolicy.Port.Number {
						policy.Tls = portTrafficPolicy.Tls
						portUpdated = true
						break
					}
				}
				if portUpdated {
					continue
				}
				dr.DestinationRule.TrafficPolicy.PortLevelSettings = append(dr.DestinationRule.TrafficPolicy.PortLevelSettings, portTrafficPolicy)
			}
		}
	}

	out := make([]config.Config, 0, len(destinationRules))
	for _, dr := range destinationRules {
		sort.SliceStable(dr.DestinationRule.TrafficPolicy.PortLevelSettings, func(i, j int) bool {
			portI := dr.DestinationRule.TrafficPolicy.PortLevelSettings[i].Port
			portJ := dr.DestinationRule.TrafficPolicy.PortLevelSettings[j].Port
			if portI == nil && portJ == nil {
				return true
			} else if portI == nil {
				return true
			} else if portJ == nil {
				return false
			}
			return portI.Number < portJ.Number
		})
		drName := util.CreateDestinationRuleName(m.clusterId, dr.ServiceKey.Namespace, dr.ServiceKey.Name)
		out = append(out, config.Config{
			Meta: config.Meta{
				GroupVersionKind: gvk.DestinationRule,
				Name:             common.CreateConvertedName(constants.IstioIngressGatewayName, drName),
				Namespace:        m.namespace,
			},
			Spec: dr.DestinationRule,
		})
	}

	return out
}