pilot/pkg/xds/simple.go (84 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package xds import ( "net" ) import ( "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) import ( configaggregate "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/aggregate" "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate" controllermemory "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry" "github.com/apache/dubbo-go-pixiu/pkg/config/mesh" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections" ) // Server represents the XDS serving feature of Istiod (pilot). // Unlike bootstrap/, this packet has no dependencies on K8S, CA, // and other features. It'll be used initially in the istio-agent, // to provide a minimal proxy while reusing the same code as istiod. // Portions of the code will also be used in istiod - after it becomes // stable the plan is to refactor bootstrap to use this code instead // of directly bootstrapping XDS. // // The server support proxy/federation of multiple sources - last part // or parity with MCP/Galley and MCP-over-XDS. type SimpleServer struct { // DiscoveryServer is the gRPC XDS implementation // Env and MemRegistry are available as fields, as well as the default // PushContext. DiscoveryServer *DiscoveryServer // MemoryStore is an in-memory config store, part of the aggregate store // used by the discovery server. MemoryConfigStore model.ConfigStore // GRPCListener is the listener used for GRPC. For agent it is // an insecure port, bound to 127.0.0.1 GRPCListener net.Listener // syncCh is used for detecting if the stores have synced, // which needs to happen before serving requests. syncCh chan string ConfigStoreCache model.ConfigStoreController } // Creates an basic, functional discovery server, using the same code as Istiod, but // backed by an in-memory config and endpoint stores. // // Can be used in tests, or as a minimal XDS discovery server with no dependency on K8S or // the complex bootstrap used by Istiod. A memory registry and memory config store are used to // generate the configs - they can be programmatically updated. func NewXDS(stop chan struct{}) *SimpleServer { // Prepare a working XDS server, with aggregate config and registry stores and a memory store for each. // TODO: refactor bootstrap code to use this server, and add more registries. env := &model.Environment{ PushContext: model.NewPushContext(), } env.Watcher = mesh.NewFixedWatcher(mesh.DefaultMeshConfig()) env.PushContext.Mesh = env.Watcher.Mesh() env.Init() ds := NewDiscoveryServer(env, "istiod", map[string]string{}) ds.InitGenerators(env, "dubbo-system") ds.CachesSynced() // Config will have a fixed format: // - aggregate store // - one primary (local) memory config // Additional stores can be added dynamically - for example by push or reference from a server. // This is used to implement and test XDS federation (which is not yet final). // In-memory config store, controller and istioConfigStore schemas := collections.Pilot store := memory.Make(schemas) s := &SimpleServer{ DiscoveryServer: ds, } s.syncCh = make(chan string, len(schemas.All())) configController := memory.NewController(store) s.MemoryConfigStore = model.MakeIstioStore(configController) // Endpoints/Clusters - using the config store for ServiceEntries serviceControllers := aggregate.NewController(aggregate.Options{}) serviceEntryController := serviceentry.NewController(configController, s.MemoryConfigStore, ds) serviceControllers.AddRegistry(serviceEntryController) sd := controllermemory.NewServiceDiscovery() sd.EDSUpdater = ds ds.MemRegistry = sd serviceControllers.AddRegistry(serviceregistry.Simple{ ProviderID: "Mem", ServiceDiscovery: sd, Controller: sd.Controller, }) env.ServiceDiscovery = serviceControllers go configController.Run(stop) // configStoreCache - with HasSync interface aggregateConfigController, err := configaggregate.MakeCache([]model.ConfigStoreController{ configController, }) if err != nil { log.Fatalf("Creating aggregate config: %v", err) } // TODO: fix the mess of store interfaces - most are too generic for their own good. s.ConfigStoreCache = aggregateConfigController env.ConfigStore = model.MakeIstioStore(aggregateConfigController) return s } func (s *SimpleServer) StartGRPC(addr string) (string, error) { lis, err := net.Listen("tcp", addr) if err != nil { return "", err } gs := grpc.NewServer() s.DiscoveryServer.Register(gs) reflection.Register(gs) s.GRPCListener = lis go func() { err = gs.Serve(lis) if err != nil { log.Infof("Serve done with %v", err) } }() return lis.Addr().String(), nil }