pkg/adapter/dubboregistry/registrycenter.go (117 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dubboregistry import ( "os" "strconv" "strings" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/nacos" _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/zookeeper" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/adapter" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/server" "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" ) func init() { adapter.RegisterAdapterPlugin(&Plugin{}) } var ( _ adapter.AdapterPlugin = new(Plugin) _ adapter.Adapter = new(Adapter) ) type ( // Plugin to monitor dubbo services on registry center Plugin struct { } AdaptorConfig struct { Registries map[string]model.Registry `yaml:"registries" json:"registries" mapstructure:"registries"` } ) // Kind returns the identifier of the plugin func (p Plugin) Kind() string { return constant.DubboRegistryCenterAdapter } // CreateAdapter returns the dubbo registry center adapter func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) { adapter := &Adapter{id: a.ID, registries: make(map[string]registry.Registry), cfg: &AdaptorConfig{Registries: make(map[string]model.Registry)}} return adapter, nil } // Adapter to monitor dubbo services on registry center type Adapter struct { id string cfg *AdaptorConfig registries map[string]registry.Registry } // Start starts the adaptor func (a *Adapter) Start() { for _, reg := range a.registries { if err := reg.Subscribe(); err != nil { logger.Errorf("Subscribe fail, error is {%s}", err.Error()) } } } // Stop stops the adaptor func (a *Adapter) Stop() { for _, reg := range a.registries { if err := reg.Unsubscribe(); err != nil { logger.Errorf("Unsubscribe fail, error is {%s}", err.Error()) } } } // Apply inits the registries according to the configuration func (a *Adapter) Apply() error { // create registry per config nacosAddrFromEnv := os.Getenv(constant.EnvDubbogoPixiuNacosRegistryAddress) for k, registryConfig := range a.cfg.Registries { var err error if nacosAddrFromEnv != "" && registryConfig.Protocol == constant.Nacos { registryConfig.Address = nacosAddrFromEnv } a.registries[k], err = registry.GetRegistry(k, registryConfig, a) if err != nil { return err } } return nil } // Config returns the config of the adaptor func (a *Adapter) Config() any { return a.cfg } func (a *Adapter) OnAddAPI(r router.API) error { ipPort := strings.Split(r.IntegrationRequest.URL, ":") port, err := strconv.Atoi(ipPort[1]) if err != nil { return err } cluster := getClusterName(r) server.GetClusterManager().SetEndpoint(cluster, &model.Endpoint{ ID: r.IntegrationRequest.URL, Address: model.SocketAddress{ Address: ipPort[0], Port: port, }}, ) var match model.RouterMatch var path string if r.DubboBackendConfig.Method == constant.AnyValue { path = strings.Join([]string{r.ApplicationName, r.Interface}, constant.PathSlash) match = model.RouterMatch{Prefix: path, Methods: []string{string(r.HTTPVerb)}} } else { path = strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method}, constant.PathSlash) match = model.RouterMatch{Path: path, Methods: []string{string(r.HTTPVerb)}} } route := model.RouteAction{Cluster: cluster} added := &model.Router{ID: path, Match: match, Route: route} server.GetRouterManager().AddRouter(added) return server.GetApiConfigManager().AddAPI(a.id, r) } func (a *Adapter) OnRemoveAPI(r router.API) error { cluster := getClusterName(r) server.GetClusterManager().DeleteEndpoint(cluster, r.IntegrationRequest.URL) return server.GetApiConfigManager().RemoveAPI(a.id, r) } func (a *Adapter) OnDeleteRouter(r config.Resource) error { acm := server.GetApiConfigManager() return acm.DeleteRouter(a.id, r) } func getClusterName(r router.API) string { return strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method, r.Version, r.Group}, constant.PathSlash) }