pkg/server/cluster_manager.go (237 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package server import ( "fmt" "sync" "sync/atomic" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer" "github.com/apache/dubbo-go-pixiu/pkg/common/yaml" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/server/controls" ) // generate cluster name for unnamed cluster var clusterIndex int32 = 1 type ( ClusterManager struct { rw sync.RWMutex store *ClusterStore //cConfig []*model.ClusterConfig } // ClusterStore store for cluster array ClusterStore struct { Config []*model.ClusterConfig `yaml:"config" json:"config"` Version int32 `yaml:"version" json:"version"` clustersMap map[string]*cluster.Cluster } // xdsControlStore help convert ClusterStore to controls.ClusterStore interface xdsControlStore struct { *ClusterStore } ) func (x *xdsControlStore) Config() []*model.ClusterConfig { return x.ClusterStore.Config } // CloneXdsControlStore clone cluster store for xds func (cm *ClusterManager) CloneXdsControlStore() (controls.ClusterStore, error) { store, err := cm.CloneStore() return &xdsControlStore{store}, err } func CreateDefaultClusterManager(bs *model.Bootstrap) *ClusterManager { return &ClusterManager{store: newClusterStore(bs)} } func newClusterStore(bs *model.Bootstrap) *ClusterStore { store := &ClusterStore{ clustersMap: map[string]*cluster.Cluster{}, } for _, c := range bs.StaticResources.Clusters { store.AddCluster(c) } return store } func (cm *ClusterManager) AddCluster(c *model.ClusterConfig) { cm.rw.Lock() defer cm.rw.Unlock() cm.store.IncreaseVersion() cm.store.AddCluster(c) } func (cm *ClusterManager) UpdateCluster(new *model.ClusterConfig) { cm.rw.Lock() defer cm.rw.Unlock() cm.store.IncreaseVersion() cm.store.UpdateCluster(new) } func (cm *ClusterManager) SetEndpoint(clusterName string, endpoint *model.Endpoint) { cm.rw.Lock() defer cm.rw.Unlock() cm.store.IncreaseVersion() cm.store.SetEndpoint(clusterName, endpoint) } func (cm *ClusterManager) DeleteEndpoint(clusterName string, endpointID string) { cm.rw.Lock() defer cm.rw.Unlock() cm.store.IncreaseVersion() cm.store.DeleteEndpoint(clusterName, endpointID) } func (cm *ClusterManager) CloneStore() (*ClusterStore, error) { cm.rw.Lock() defer cm.rw.Unlock() b, err := yaml.MarshalYML(cm.store) if err != nil { return nil, err } c := &ClusterStore{ clustersMap: map[string]*cluster.Cluster{}, } if err := yaml.UnmarshalYML(b, c); err != nil { return nil, err } return c, nil } func (cm *ClusterManager) NewStore(version int32) *ClusterStore { cm.rw.Lock() defer cm.rw.Unlock() return &ClusterStore{Version: version, clustersMap: map[string]*cluster.Cluster{}} } func (cm *ClusterManager) CompareAndSetStore(store *ClusterStore) bool { cm.rw.Lock() defer cm.rw.Unlock() if store.Version != cm.store.Version { return false } cm.store = store return true } func (cm *ClusterManager) PickEndpoint(clusterName string, policy model.LbPolicy) *model.Endpoint { cm.rw.RLock() defer cm.rw.RUnlock() for _, c := range cm.store.Config { if c.Name == clusterName { return cm.pickOneEndpoint(c, policy) } } return nil } func (cm *ClusterManager) pickOneEndpoint(c *model.ClusterConfig, policy model.LbPolicy) *model.Endpoint { if len(c.Endpoints) == 0 { return nil } if len(c.Endpoints) == 1 { if !c.Endpoints[0].UnHealthy { return c.Endpoints[0] } return nil } loadBalancer, ok := loadbalancer.LoadBalancerStrategy[c.LbStr] if ok { return loadBalancer.Handler(c, policy) } return loadbalancer.LoadBalancerStrategy[model.LoadBalancerRand].Handler(c, policy) } func (cm *ClusterManager) RemoveCluster(namesToDel []string) { cm.rw.Lock() defer cm.rw.Unlock() for i, c := range cm.store.Config { if c == nil { continue } for _, name := range namesToDel { // suppose resource to remove and clusters is few if name == c.Name { removed := cm.store.Config[i] cm.store.clustersMap[removed.Name].Stop() cm.store.Config[i] = nil delete(cm.store.clustersMap, removed.Name) } } } //re-construct cm.store.Config remove nil element for i := 0; i < len(cm.store.Config); { if cm.store.Config[i] != nil { i++ continue } cm.store.Config = append(cm.store.Config[:i], cm.store.Config[i+1:]...) } cm.store.IncreaseVersion() } func (cm *ClusterManager) HasCluster(clusterName string) bool { cm.rw.Lock() defer cm.rw.Unlock() return cm.store.HasCluster(clusterName) } func (s *ClusterStore) AddCluster(c *model.ClusterConfig) { if c.Name == "" { index := atomic.AddInt32(&clusterIndex, 1) c.Name = fmt.Sprintf("cluster%d", index) } s.Config = append(s.Config, c) s.clustersMap[c.Name] = cluster.NewCluster(c) c.CreateConsistentHash() } func (s *ClusterStore) UpdateCluster(new *model.ClusterConfig) { for i, c := range s.Config { if c.Name == new.Name { s.Config[i] = new return } } logger.Warnf("not found modified cluster %s", new.Name) } func (s *ClusterStore) SetEndpoint(clusterName string, endpoint *model.Endpoint) { cluster := s.clustersMap[clusterName] if cluster == nil { c := &model.ClusterConfig{Name: clusterName, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{}} s.AddCluster(c) cluster = s.clustersMap[clusterName] } for _, c := range s.Config { if c.Name == clusterName { for _, e := range c.Endpoints { // endpoint update if e.ID == endpoint.ID { cluster.RemoveEndpoint(e) e.Name = endpoint.Name e.Metadata = endpoint.Metadata e.Address = endpoint.Address cluster.AddEndpoint(e) return } } // endpoint create c.Endpoints = append(c.Endpoints, endpoint) cluster.AddEndpoint(endpoint) if c.ConsistentHash.Hash != nil { c.ConsistentHash.Hash.Add(endpoint.GetHost()) } return } } } func (s *ClusterStore) DeleteEndpoint(clusterName string, endpointID string) { cluster := s.clustersMap[clusterName] if cluster == nil { return } for _, c := range s.Config { if c.Name == clusterName { for i, e := range c.Endpoints { if e.ID == endpointID { cluster.RemoveEndpoint(e) c.Endpoints = append(c.Endpoints[:i], c.Endpoints[i+1:]...) if c.ConsistentHash.Hash != nil { c.ConsistentHash.Hash.Remove(e.GetHost()) } return } } logger.Warnf("not found endpoint %s", endpointID) return } } logger.Warnf("not found cluster %s", clusterName) } func (s *ClusterStore) HasCluster(clusterName string) bool { for _, c := range s.Config { if c.Name == clusterName { return true } } return false } func (s *ClusterStore) IncreaseVersion() { atomic.AddInt32(&s.Version, 1) }