in pkg/providers/apisix/translation/apisix_route.go [512:576]
func (t *translator) translateStreamRouteV2(ctx *translation.TranslateContext, ar *configv2.ApisixRoute) error {
ruleNameMap := make(map[string]struct{})
for _, part := range ar.Spec.Stream {
if _, ok := ruleNameMap[part.Name]; ok {
return errors.New("duplicated route rule name")
}
ruleNameMap[part.Name] = struct{}{}
backend := part.Backend
svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2(backend, ar.Namespace)
if err != nil {
log.Errorw("failed to get service port in backend",
zap.Any("backend", backend),
zap.Any("apisix_route", ar),
zap.Error(err),
)
return err
}
// add stream route plugins
pluginMap := make(apisixv1.Plugins)
for _, plugin := range part.Plugins {
if !plugin.Enable {
continue
}
if plugin.Config != nil {
if plugin.SecretRef != "" {
sec, err := t.SecretLister.Secrets(ar.Namespace).Get(plugin.SecretRef)
if err != nil {
log.Errorw("The config secretRef is invalid",
zap.Any("plugin", plugin.Name),
zap.String("secretRef", plugin.SecretRef))
break
}
log.Debugw("Add new items, then override items with the same plugin key",
zap.Any("plugin", plugin.Name),
zap.String("secretRef", plugin.SecretRef))
for key, value := range sec.Data {
utils.InsertKeyInMap(key, string(value), plugin.Config)
}
}
pluginMap[plugin.Name] = plugin.Config
} else {
pluginMap[plugin.Name] = make(map[string]interface{})
}
}
sr := apisixv1.NewDefaultStreamRoute()
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr.ID = id.GenID(name)
sr.ServerPort = part.Match.IngressPort
sr.SNI = part.Match.Host
ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
sr.UpstreamId = ups.ID
sr.Plugins = pluginMap
ctx.AddStreamRoute(sr)
if !ctx.CheckUpstreamExist(ups.Name) {
ctx.AddUpstream(ups)
}
}
return nil
}