pkg/authority/rule/connection/storage.go (228 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package connection import ( "errors" "io" "strconv" "sync" "sync/atomic" "time" "github.com/apache/dubbo-admin/pkg/authority/rule" "github.com/apache/dubbo-admin/pkg/logger" "k8s.io/client-go/util/workqueue" ) type Storage struct { Mutex *sync.RWMutex Connection []*Connection LatestRules map[string]rule.Origin } type EndpointConnection interface { Send(*ObserveResponse) error Recv() (*ObserveRequest, error) Disconnect() } type Connection struct { mutex *sync.RWMutex status ConnectionStatus EndpointConnection EndpointConnection Endpoint *rule.Endpoint TypeListened map[string]bool RawRuleQueue workqueue.Interface ExpectedRules map[string]*VersionedRule ClientRules map[string]*ClientStatus } type VersionedRule struct { Revision int64 Type string Data rule.ToClient } type PushingStatus int const ( Pushed PushingStatus = iota Pushing ) type ConnectionStatus int const ( Connected ConnectionStatus = iota Disconnected ) type ClientStatus struct { PushQueued bool PushingStatus PushingStatus NonceInc int64 ClientVersion *VersionedRule LastPushedTime int64 LastPushedVersion *VersionedRule LastPushNonce string } type ObserveResponse struct { Nonce string Type string Data rule.ToClient } type ObserveRequest struct { Nonce string Type string } func NewStorage() *Storage { return &Storage{ Mutex: &sync.RWMutex{}, Connection: []*Connection{}, LatestRules: map[string]rule.Origin{}, } } func (s *Storage) Connected(endpoint *rule.Endpoint, connection EndpointConnection) { s.Mutex.Lock() defer s.Mutex.Unlock() c := &Connection{ mutex: &sync.RWMutex{}, status: Connected, EndpointConnection: connection, Endpoint: endpoint, RawRuleQueue: workqueue.NewNamed("raw-rule"), TypeListened: map[string]bool{}, ExpectedRules: map[string]*VersionedRule{}, ClientRules: map[string]*ClientStatus{}, } s.Connection = append(s.Connection, c) go s.listenConnection(c) go c.listenRule() } func (s *Storage) listenConnection(c *Connection) { for { if c.status == Disconnected { return } req, err := c.EndpointConnection.Recv() if errors.Is(err, io.EOF) { logger.Sugar().Infof("Observe connection closed. Connection ID: %s", c.Endpoint.ID) s.Disconnect(c) return } if err != nil { logger.Sugar().Warnf("Observe connection error: %v. Connection ID: %s", err, c.Endpoint.ID) s.Disconnect(c) return } s.HandleRequest(c, req) } } func (s *Storage) HandleRequest(c *Connection, req *ObserveRequest) { if req.Type == "" { logger.Sugar().Errorf("Empty request type from %v", c.Endpoint.ID) return } if !TypeSupported(req.Type) { logger.Sugar().Errorf("Unsupported request type %s from %s", req.Type, c.Endpoint.ID) return } c.mutex.Lock() defer c.mutex.Unlock() if req.Nonce != "" { cr := c.ClientRules[req.Type] if cr == nil { logger.Sugar().Errorf("Unexpected request type %s with nonce %s from %s", req.Type, req.Nonce, c.Endpoint.ID) return } if cr.PushingStatus == Pushing { if cr.LastPushNonce != req.Nonce { logger.Sugar().Errorf("Unexpected request nonce %s from %s", req.Nonce, c.Endpoint.ID) return } cr.ClientVersion = cr.LastPushedVersion cr.PushingStatus = Pushed logger.Sugar().Infof("Client %s pushed %s rule %s success", c.Endpoint.Ips, req.Type, cr.ClientVersion.Revision) } return } if _, ok := c.TypeListened[req.Type]; !ok { logger.Sugar().Infof("Client %s listen %s rule", c.Endpoint.Ips, req.Type) c.TypeListened[req.Type] = true c.ClientRules[req.Type] = &ClientStatus{ PushingStatus: Pushed, NonceInc: 0, ClientVersion: &VersionedRule{ Revision: -1, Type: req.Type, }, LastPushedTime: 0, LastPushedVersion: nil, LastPushNonce: "", } latestRule := s.LatestRules[req.Type] if latestRule != nil { c.RawRuleQueue.Add(latestRule) } } } func (c *Connection) listenRule() { for { obj, shutdown := c.RawRuleQueue.Get() if shutdown { return } func(obj interface{}) { defer c.RawRuleQueue.Done(obj) var key rule.Origin var ok bool if key, ok = obj.(rule.Origin); !ok { logger.Sugar().Errorf("expected rule.Origin in workqueue but got %#v", obj) return } if err := c.handleRule(key); err != nil { logger.Sugar().Errorf("error syncing '%s': %s", key, err.Error()) return } logger.Sugar().Infof("Successfully synced '%s'", key) }(obj) } } func (c *Connection) handleRule(rawRule rule.Origin) error { targetRule, err := rawRule.Exact(c.Endpoint) if err != nil { return err } if _, ok := c.TypeListened[targetRule.Type()]; !ok { return nil } cr := c.ClientRules[targetRule.Type()] for cr.PushingStatus == Pushing { cr.PushQueued = true time.Sleep(1 * time.Second) logger.Sugar().Infof("Client %s %s rule is pushing, wait for 1 second", c.Endpoint.Ips, targetRule.Type()) } cr.PushQueued = false if cr.ClientVersion.Data != nil && (cr.ClientVersion.Data.Data() == targetRule.Data() || cr.ClientVersion.Data.Revision() >= targetRule.Revision()) { logger.Sugar().Infof("Client %s %s rule is up to date", c.Endpoint.Ips, targetRule.Type()) return nil } newVersion := atomic.AddInt64(&cr.NonceInc, 1) r := &ObserveResponse{ Nonce: strconv.FormatInt(newVersion, 10), Type: targetRule.Type(), Data: targetRule, } cr.LastPushedTime = time.Now().Unix() cr.LastPushedVersion = &VersionedRule{ Type: targetRule.Type(), Revision: targetRule.Revision(), Data: targetRule, } cr.LastPushNonce = r.Nonce cr.PushingStatus = Pushing logger.Sugar().Infof("Receive new version rule. Client %s %s rule is pushing.", c.Endpoint.Ips, targetRule.Type()) return c.EndpointConnection.Send(r) } func TypeSupported(t string) bool { return t == "authentication/v1beta1" || t == "authorization/v1beta1" } func (s *Storage) Disconnect(c *Connection) { for i, sc := range s.Connection { if sc == c { s.Connection = append(s.Connection[:i], s.Connection[i+1:]...) break } } c.EndpointConnection.Disconnect() c.RawRuleQueue.ShutDown() }