pkg/config/xds/cds.go (176 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package xds import ( "github.com/dubbo-go-pixiu/pixiu-api/pkg/api" xdspb "github.com/dubbo-go-pixiu/pixiu-api/pkg/xds/model" "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/config/xds/apiclient" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/server/controls" ) type CdsManager struct { DiscoverApi clusterMg controls.ClusterManager } // Fetch overwrite DiscoverApi.Fetch. func (c *CdsManager) Fetch() error { r, err := c.DiscoverApi.Fetch("") //todo use local version if err != nil { return err } clusters := make([]*xdspb.Cluster, 0, len(r)) for _, one := range r { extClusters := &xdspb.PixiuExtensionClusters{} if err := one.To(extClusters); err != nil { logger.Errorf("unknown resource of %s, expect Listener", one.GetName()) continue } logger.Infof("clusters from xds server %v", extClusters) clusters = append(clusters, extClusters.Clusters...) } return c.setupCluster(clusters) } func (c *CdsManager) Delta() error { readCh, err := c.DiscoverApi.Delta() if err != nil { return err } go c.asyncHandler(readCh) return nil } func (c *CdsManager) asyncHandler(read chan *apiclient.DeltaResources) { for one := range read { clusters := make([]*xdspb.Cluster, 0, len(one.NewResources)) for _, one := range one.NewResources { cluster := &xdspb.PixiuExtensionClusters{} if err := one.To(cluster); err != nil { logger.Errorf("unknown resource of %s, expect Listener", one.GetName()) continue } logger.Infof("clusters from xds server %v", cluster) clusters = append(clusters, cluster.Clusters...) } if err := c.setupCluster(clusters); err != nil { logger.Errorf("can not setup cluster.", err) } } } func (c *CdsManager) removeCluster(clusterNames []string) { c.clusterMg.RemoveCluster(clusterNames) } func (c *CdsManager) setupCluster(clusters []*xdspb.Cluster) error { laterApplies := make([]func() error, 0, len(clusters)) toRemoveHash := make(map[string]struct{}, len(clusters)) store, err := c.clusterMg.CloneXdsControlStore() if err != nil { return errors.WithMessagef(err, "can not clone cluster store when update cluster") } //todo this will remove the cluster which defined locally. for _, cluster := range store.Config() { toRemoveHash[cluster.Name] = struct{}{} } for _, cluster := range clusters { delete(toRemoveHash, cluster.Name) makeCluster := c.makeCluster(cluster) switch { case c.clusterMg.HasCluster(cluster.Name): laterApplies = append(laterApplies, func() error { c.clusterMg.UpdateCluster(makeCluster) return nil }) default: laterApplies = append(laterApplies, func() error { c.clusterMg.AddCluster(makeCluster) return nil }) } } c.removeClusters(toRemoveHash) for _, fn := range laterApplies { //do update and add new cluster. if err := fn(); err != nil { logger.Errorf("can not modify cluster", err) } } return nil } func (c *CdsManager) removeClusters(toRemoveList map[string]struct{}) { removeClusters := make([]string, 0, len(toRemoveList)) for clusterName := range toRemoveList { removeClusters = append(removeClusters, clusterName) } if len(toRemoveList) == 0 { return } c.removeCluster(removeClusters) } func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) *model.ClusterConfig { return &model.ClusterConfig{ Name: cluster.Name, TypeStr: cluster.TypeStr, Type: c.makeClusterType(cluster), EdsClusterConfig: c.makeEdsClusterConfig(cluster.EdsClusterConfig), LbStr: c.makeLoadBalancePolicy(cluster.LbStr), HealthChecks: c.makeHealthChecks(cluster.HealthChecks), Endpoints: c.makeEndpoints(cluster.Endpoints), } } func (c *CdsManager) makeLoadBalancePolicy(lb string) model.LbPolicyType { return model.LbPolicyTypeValue[lb] } func (c *CdsManager) makeClusterType(cluster *xdspb.Cluster) model.DiscoveryType { return model.DiscoveryTypeValue[cluster.TypeStr] } func (c *CdsManager) makeEndpoints(endpoints []*xdspb.Endpoint) []*model.Endpoint { r := make([]*model.Endpoint, len(endpoints)) for i, endpoint := range endpoints { r[i] = &model.Endpoint{ ID: endpoint.Id, Name: endpoint.Name, Address: c.makeAddress(endpoint), Metadata: endpoint.Metadata, } } return r } func (c *CdsManager) makeAddress(endpoint *xdspb.Endpoint) model.SocketAddress { if endpoint == nil || endpoint.Address == nil { return model.SocketAddress{} } return model.SocketAddress{ Address: endpoint.Address.Address, Port: int(endpoint.Address.Port), ResolverName: endpoint.Address.ResolverName, Domains: endpoint.Address.Domains, CertsDir: endpoint.Address.CertsDir, } } func (c *CdsManager) makeHealthChecks(checks []*xdspb.HealthCheck) (result []model.HealthCheckConfig) { //todo implement me after fix model.HealthCheck type define //result = make([]model.HealthCheck, 0, len(checks)) //for _, check := range checks { // switch one := check.GetChecker().(type) { // case *xdspb.HealthCheck_HttpChecker: // result = append(result, model.HttpHealthCheck{ // Host: one.HttpChecker.Host, // Path: one.HttpChecker.Path, // UseHttp2: one.HttpChecker.UseHttp2, // ExpectedStatuses: one.HttpChecker.ExpectedStatuses, // }) // case *xdspb.HealthCheck_GrpcChecker: // result = append(result, model.GrpcHealthCheck{ // ServiceName: one.GrpcChecker.ServiceName, // Authority: one.GrpcChecker.Authority, // }) // case *xdspb.HealthCheck_CustomChecker: // result = append(result, model.CustomHealthCheck{ // Name: one.CustomChecker.Name, // Config: func() interface{} { // if one.CustomChecker.Config == nil { // return nil // } // return one.CustomChecker.Config.AsMap() // }(), // }) // } //} return } func (c *CdsManager) makeEdsClusterConfig(edsConfig *xdspb.EdsClusterConfig) model.EdsClusterConfig { if edsConfig == nil { return model.EdsClusterConfig{} } return model.EdsClusterConfig{ EdsConfig: model.ConfigSource{ Path: edsConfig.EdsConfig.Path, ApiConfigSource: c.makeApiConfigSource(edsConfig.EdsConfig.ApiConfigSource), }, ServiceName: edsConfig.ServiceName, } } func (c *CdsManager) makeApiConfigSource(apiConfig *xdspb.ApiConfigSource) (result model.ApiConfigSource) { apiType, ok := model.ApiTypeValue[apiConfig.APITypeStr] if !ok { logger.Errorf("unknown apiType %s", apiConfig.APITypeStr) return } return model.ApiConfigSource{ APIType: api.ApiType(apiType), APITypeStr: apiConfig.APITypeStr, ClusterName: apiConfig.ClusterName, RefreshDelay: apiConfig.RefreshDelay, RequestTimeout: apiConfig.RequestTimeout, GrpcServices: nil, //todo create node of pb } }