pkg/config/xds/apiclient/grpc.go (313 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package apiclient import ( "context" stderr "errors" "sync" "time" ) import ( envoyconfigcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" extensionpb "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/anypb" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/server/controls" ) // agent name to talk with xDS server const xdsAgentName = "dubbo-go-pixiu" var ( grpcMg *GRPCClusterManager ErrClusterNotFound = stderr.New("can not find cluster") ) type ( GrpcExtensionApiClient struct { config model.ApiConfigSource grpcMg *GRPCClusterManager node *model.Node xDSExtensionClient extensionpb.ExtensionConfigDiscoveryServiceClient resourceNames []ResourceTypeName typeUrl string exitCh chan struct{} } xdsState struct { nonce string deltaVersion map[string]string versionInfo string } ) func Init(clusterMg controls.ClusterManager) { grpcMg = &GRPCClusterManager{ clusters: &sync.Map{}, clusterMg: clusterMg, } } func Stop() { if err := grpcMg.Close(); err != nil { //todo logger.Errorf("grpcClusterManager close failed. %v", err) } } // CreateGrpExtensionApiClient create Grpc type ApiClient func CreateGrpExtensionApiClient(config *model.ApiConfigSource, node *model.Node, exitCh chan struct{}, typeName ResourceTypeName) *GrpcExtensionApiClient { v := &GrpcExtensionApiClient{ config: *config, node: node, typeUrl: typeName, grpcMg: grpcMg, exitCh: exitCh, } v.init() return v } // Fetch get config data from discovery service and return Any type. func (g *GrpcExtensionApiClient) Fetch(localVersion string) ([]*ProtoAny, error) { clsRsp, err := g.xDSExtensionClient.FetchExtensionConfigs(context.Background(), &discoverypb.DiscoveryRequest{ VersionInfo: localVersion, Node: g.makeNode(), ResourceNames: g.resourceNames, TypeUrl: resource.ExtensionConfigType, //"type.googleapis.com/pixiu.config.listener.v3.Listener", //resource.ListenerType, }) if err != nil { return nil, errors.Wrapf(err, "fetch dynamic resource from remote error. %s", g.resourceNames) } logger.Infof("init the from xds server typeUrl=%s version=%s", clsRsp.TypeUrl, clsRsp.VersionInfo) extensions := make([]*ProtoAny, 0, len(clsRsp.Resources)) for _, clsResource := range clsRsp.Resources { elems, err := g.decodeSource(clsResource) if err != nil { return nil, err } extensions = append(extensions, elems) } return extensions, nil } func (g *GrpcExtensionApiClient) decodeSource(resource *anypb.Any) (*ProtoAny, error) { extension := envoyconfigcorev3.TypedExtensionConfig{} err := resource.UnmarshalTo(&extension) if err != nil { return nil, errors.Wrapf(err, "typed extension as expected.(%s)", g.resourceNames) } elems := &ProtoAny{typeConfig: &extension} return elems, nil } func (g *GrpcExtensionApiClient) makeNode() *envoyconfigcorev3.Node { return &envoyconfigcorev3.Node{ Id: g.node.Id, Cluster: g.node.Cluster, UserAgentName: xdsAgentName, } } func (g *GrpcExtensionApiClient) Delta() (chan *DeltaResources, error) { outputCh := make(chan *DeltaResources) return outputCh, g.runDelta(outputCh) } func (g *GrpcExtensionApiClient) runDelta(output chan<- *DeltaResources) error { var delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient var cancel context.CancelFunc var xState xdsState backoff := func() { for { //back off var err error var ctx context.Context // context to sync exitCh ctx, cancel = context.WithCancel(context.TODO()) delta, err = g.sendInitDeltaRequest(ctx, &xState) if err != nil { logger.Error("can not receive delta discovery request, will back off 1 sec later", err) select { case <-time.After(1 * time.Second): case <-g.exitCh: logger.Infof("get close single.") return } continue //backoff } return //success } } backoff() if delta == nil { // delta instance not created because exitCh return nil } go func() { //waiting exitCh close for range g.exitCh { } cancel() }() //get message go func() { for { // delta response backoff. for { //loop consume recv data form xds server(sendInitDeltaRequest) resp, err := delta.Recv() if err != nil { //todo backoff retry logger.Error("can not receive delta discovery request", err) break } g.handleDeltaResponse(resp, &xState, output) err = g.subscribeOnGoingChang(delta, &xState) if err != nil { logger.Error("can not recv delta discovery request", err) break } } backoff() } }() return nil } func (g *GrpcExtensionApiClient) handleDeltaResponse(resp *discoverypb.DeltaDiscoveryResponse, xState *xdsState, output chan<- *DeltaResources) { // save the xds state xState.deltaVersion = make(map[string]string, 1) xState.nonce = resp.Nonce resources := &DeltaResources{ NewResources: make([]*ProtoAny, 0, 1), RemovedResource: make([]string, 0, 1), } logger.Infof("get xDS message nonce, %s", resp.Nonce) for _, res := range resp.RemovedResources { logger.Infof("remove resource found ", res) resources.RemovedResource = append(resources.RemovedResource, res) } for _, res := range resp.Resources { logger.Infof("new resource found %s version=%s", res.Name, res.Version) xState.deltaVersion[res.Name] = res.Version elems, err := g.decodeSource(res.Resource) if err != nil { logger.Infof("can not decode source %s version=%s", res.Name, res.Version, err) } resources.NewResources = append(resources.NewResources, elems) } //notify the resource change handler output <- resources } func (g *GrpcExtensionApiClient) subscribeOnGoingChang(delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient, xState *xdsState) error { err := delta.Send(&discoverypb.DeltaDiscoveryRequest{ Node: g.makeNode(), TypeUrl: resource.ExtensionConfigType, InitialResourceVersions: xState.deltaVersion, ResponseNonce: xState.nonce, }) return err } func (g *GrpcExtensionApiClient) sendInitDeltaRequest(ctx context.Context, xState *xdsState) (extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient, error) { delta, err := g.xDSExtensionClient.DeltaExtensionConfigs(ctx) if err != nil { return nil, errors.Wrapf(err, "can not start delta stream with xds server ") } err = delta.Send(&discoverypb.DeltaDiscoveryRequest{ Node: g.makeNode(), TypeUrl: resource.ExtensionConfigType, ResourceNamesSubscribe: []string{g.typeUrl}, ResourceNamesUnsubscribe: nil, InitialResourceVersions: xState.deltaVersion, ResponseNonce: xState.nonce, ErrorDetail: nil, }) if err != nil { return nil, errors.Wrapf(err, "can not send delta discovery request") } return delta, nil } func (g *GrpcExtensionApiClient) init() { if len(g.config.ClusterName) == 0 { panic("should config one cluster at least") } //todo implement multiple grpc api services if len(g.config.ClusterName) > 1 { logger.Warnf("defined multiple cluster for xDS api services but only one support.") } cluster, err := g.grpcMg.GetGrpcCluster(g.config.ClusterName[0]) if err != nil { logger.Errorf("get cluster for init error. error=%v", err) panic(err) } conn, err := cluster.GetConnection() if err != nil { panic(err) } g.xDSExtensionClient = extensionpb.NewExtensionConfigDiscoveryServiceClient(conn) } type GRPCClusterManager struct { clusters *sync.Map // map[clusterName]*grpcCluster clusterMg controls.ClusterManager } type GRPCCluster struct { name string //cluster name config *model.ClusterConfig once sync.Once conn *grpc.ClientConn } // GetGrpcCluster get the cluster or create it first time. func (g *GRPCClusterManager) GetGrpcCluster(name string) (*GRPCCluster, error) { if !g.clusterMg.HasCluster(name) { return nil, errors.Wrapf(ErrClusterNotFound, "name = %s", name) } if load, ok := g.clusters.Load(name); ok { grpcCluster := load.(*GRPCCluster) // grpcClusterManager only return grpcCluster, nil } store, err := g.clusterMg.CloneXdsControlStore() if err != nil { return nil, errors.WithMessagef(err, "clone cluster store failed") } var clusterCfg *model.ClusterConfig for _, cfg := range store.Config() { if cfg.Name == name { clusterCfg = cfg break } } if clusterCfg == nil { return nil, errors.Wrapf(ErrClusterNotFound, "name of %s", name) } newCluster := &GRPCCluster{ name: name, config: clusterCfg, } g.clusters.Store(name, newCluster) return newCluster, nil } func (g *GRPCClusterManager) Close() (err error) { //todo enhance the close process when concurrent g.clusters.Range(func(_, value any) bool { if conn := value.(*grpc.ClientConn); conn != nil { if err = conn.Close(); err != nil { logger.Errorf("can not close grpc connection.", err) } } return true }) return nil } func (g *GRPCCluster) GetConnection() (conn *grpc.ClientConn, err error) { g.once.Do(func() { creds := insecure.NewCredentials() //if *xdsCreds { // todo // log.Println("Using xDS credentials...") // var err error // if creds, err = xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()}); err != nil { // log.Fatalf("failed to create client-side xDS credentials: %v", err) // } //} if len(g.config.Endpoints) == 0 { err = errors.Errorf("expect endpoint.") return } endpoint := g.config.Endpoints[0].Address.GetAddress() logger.Infof("to connect xds server %s ...", endpoint) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) //todo fix timeout cancel warning defer cancel() conn, err = grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock(), ) if err != nil { err = errors.Errorf("grpc.Dial(%s) failed: %v", endpoint, err) return } logger.Infof("connected xds server (%s)", endpoint) g.conn = conn }) return g.conn, nil } func (g *GRPCCluster) IsAlive() bool { return g.conn.GetState() == connectivity.Ready } func (g *GRPCCluster) Close() error { if err := g.conn.Close(); err != nil { return errors.Wrapf(err, "can not close. %v", g.config) } return nil }