xds/client/controller/transport.go (306 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * * Copyright 2021 gRPC authors. * */ package controller import ( "context" "fmt" "time" ) import ( "github.com/golang/protobuf/proto" "google.golang.org/grpc" ) import ( resourceversion "dubbo.apache.org/dubbo-go/v3/xds/client/controller/version" "dubbo.apache.org/dubbo-go/v3/xds/client/load" "dubbo.apache.org/dubbo-go/v3/xds/client/resource" ) // AddWatch adds a watch for an xDS resource given its type and name. func (t *Controller) AddWatch(rType resource.ResourceType, resourceName string) { t.sendCh.Put(&watchAction{ rType: rType, remove: false, resource: resourceName, }) } // RemoveWatch cancels an already registered watch for an xDS resource // given its type and name. func (t *Controller) RemoveWatch(rType resource.ResourceType, resourceName string) { t.sendCh.Put(&watchAction{ rType: rType, remove: true, resource: resourceName, }) } // run starts an ADS stream (and backs off exponentially, if the previous // stream failed without receiving a single reply) and runs the sender and // receiver routines to send and receive data from the stream respectively. func (t *Controller) run(ctx context.Context) { go t.send(ctx) // TODO: start a goroutine monitoring ClientConn's connectivity state, and // report error (and log) when stats is transient failure. retries := 0 for { select { case <-ctx.Done(): return default: } if retries != 0 { timer := time.NewTimer(t.backoff(retries)) select { case <-timer.C: case <-ctx.Done(): if !timer.Stop() { <-timer.C } return } } retries++ stream, err := t.vClient.NewStream(ctx, t.cc) if err != nil { t.updateHandler.NewConnectionError(err) t.logger.Warnf("xds: ADS stream creation failed: %v", err) continue } t.logger.Infof("ADS stream created") select { case <-t.streamCh: default: } t.streamCh <- stream if t.recv(stream) { retries = 0 } } } // send is a separate goroutine for sending watch requests on the xds stream. // // It watches the stream channel for new streams, and the request channel for // new requests to send on the stream. // // For each new request (watchAction), it's // - processed and added to the watch map // - so resend will pick them up when there are new streams // - sent on the current stream if there's one // - the current stream is cleared when any send on it fails // // For each new stream, all the existing requests will be resent. // // Note that this goroutine doesn't do anything to the old stream when there's a // new one. In fact, there should be only one stream in progress, and new one // should only be created when the old one fails (recv returns an error). func (t *Controller) send(ctx context.Context) { var stream grpc.ClientStream for { select { case <-ctx.Done(): return case stream = <-t.streamCh: if !t.sendExisting(stream) { // send failed, clear the current stream. stream = nil } case u := <-t.sendCh.Get(): t.sendCh.Load() var ( target []string rType resource.ResourceType version, nonce, errMsg string send bool ) switch update := u.(type) { case *watchAction: target, rType, version, nonce = t.processWatchInfo(update) case *ackAction: target, rType, version, nonce, send = t.processAckInfo(update, stream) if !send { continue } errMsg = update.errMsg } if stream == nil { // There's no stream yet. Skip the request. This request // will be resent to the new streams. If no stream is // created, the watcher will timeout (same as server not // sending response back). continue } if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { t.logger.Warnf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) // send failed, clear the current stream. stream = nil } } } } // sendExisting sends out xDS requests for registered watchers when recovering // from a broken stream. // // We call stream.Send() here with the lock being held. It should be OK to do // that here because the stream has just started and Send() usually returns // quickly (once it pushes the message onto the transport layer) and is only // ever blocked if we don't have enough flow control quota. func (t *Controller) sendExisting(stream grpc.ClientStream) bool { t.mu.Lock() defer t.mu.Unlock() // Reset the ack versions when the stream restarts. t.versionMap = make(map[resource.ResourceType]string) t.nonceMap = make(map[resource.ResourceType]string) for rType, s := range t.watchMap { if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { t.logger.Warnf("ADS request failed: %v", err) return false } } return true } // recv receives xDS responses on the provided ADS stream and branches out to // message specific handlers. func (t *Controller) recv(stream grpc.ClientStream) bool { success := false for { resp, err := t.vClient.RecvResponse(stream) if err != nil { t.updateHandler.NewConnectionError(err) t.logger.Warnf("ADS stream is closed with error: %v", err) return success } rType, version, nonce, err := t.handleResponse(resp) if e, ok := err.(resourceversion.ErrResourceTypeUnsupported); ok { t.logger.Warnf("%s", e.ErrStr) continue } if err != nil { t.sendCh.Put(&ackAction{ rType: rType, version: "", nonce: nonce, errMsg: err.Error(), stream: stream, }) t.logger.Warnf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) continue } t.sendCh.Put(&ackAction{ rType: rType, version: version, nonce: nonce, stream: stream, }) t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce) success = true } } func (t *Controller) handleResponse(resp proto.Message) (resource.ResourceType, string, string, error) { rType, resources, version, nonce, err := t.vClient.ParseResponse(resp) if err != nil { return rType, version, nonce, err } opts := &resource.UnmarshalOptions{ Version: version, Resources: resources, Logger: t.logger, UpdateValidator: t.updateValidator, } var md resource.UpdateMetadata switch rType { case resource.ListenerResource: var update map[string]resource.ListenerUpdateErrTuple update, md, err = resource.UnmarshalListener(opts) t.updateHandler.NewListeners(update, md) case resource.RouteConfigResource: var update map[string]resource.RouteConfigUpdateErrTuple update, md, err = resource.UnmarshalRouteConfig(opts) t.updateHandler.NewRouteConfigs(update, md) case resource.ClusterResource: var update map[string]resource.ClusterUpdateErrTuple update, md, err = resource.UnmarshalCluster(opts) t.updateHandler.NewClusters(update, md) case resource.EndpointsResource: var update map[string]resource.EndpointsUpdateErrTuple update, md, err = resource.UnmarshalEndpoints(opts) t.updateHandler.NewEndpoints(update, md) default: return rType, "", "", resourceversion.ErrResourceTypeUnsupported{ ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", rType), } } return rType, version, nonce, err } func mapToSlice(m map[string]bool) []string { ret := make([]string, 0, len(m)) for i := range m { ret = append(ret, i) } return ret } type watchAction struct { rType resource.ResourceType remove bool // Whether this is to remove watch for the resource. resource string } // processWatchInfo pulls the fields needed by the request from a watchAction. // // It also updates the watch map. func (t *Controller) processWatchInfo(w *watchAction) (target []string, rType resource.ResourceType, ver, nonce string) { t.mu.Lock() defer t.mu.Unlock() var current map[string]bool current, ok := t.watchMap[w.rType] if !ok { current = make(map[string]bool) t.watchMap[w.rType] = current } if w.remove { delete(current, w.resource) if len(current) == 0 { delete(t.watchMap, w.rType) } } else { current[w.resource] = true } rType = w.rType target = mapToSlice(current) // We don't reset version or nonce when a new watch is started. The version // and nonce from previous response are carried by the request unless the // stream is recreated. ver = t.versionMap[rType] nonce = t.nonceMap[rType] return target, rType, ver, nonce } type ackAction struct { rType resource.ResourceType version string // NACK if version is an empty string. nonce string errMsg string // Empty unless it's a NACK. // ACK/NACK are tagged with the stream it's for. When the stream is down, // all the ACK/NACK for this stream will be dropped, and the version/nonce // won't be updated. stream grpc.ClientStream } // processAckInfo pulls the fields needed by the ack request from a ackAction. // // If no active watch is found for this ack, it returns false for send. func (t *Controller) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType resource.ResourceType, version, nonce string, send bool) { if ack.stream != stream { // If ACK's stream isn't the current sending stream, this means the ACK // was pushed to queue before the old stream broke, and a new stream has // been started since. Return immediately here so we don't update the // nonce for the new stream. return nil, resource.UnknownResource, "", "", false } rType = ack.rType t.mu.Lock() defer t.mu.Unlock() // Update the nonce no matter if we are going to send the ACK request on // wire. We may not send the request if the watch is canceled. But the nonce // needs to be updated so the next request will have the right nonce. nonce = ack.nonce t.nonceMap[rType] = nonce s, ok := t.watchMap[rType] if !ok || len(s) == 0 { // We don't send the request ack if there's no active watch (this can be // either the server sends responses before any request, or the watch is // canceled while the ackAction is in queue), because there's no resource // name. And if we send a request with empty resource name list, the // server may treat it as a wild card and send us everything. return nil, resource.UnknownResource, "", "", false } send = true target = mapToSlice(s) version = ack.version if version == "" { // This is a nack, get the previous acked version. version = t.versionMap[rType] // version will still be an empty string if rType isn't // found in versionMap, this can happen if there wasn't any ack // before. } else { t.versionMap[rType] = version } return target, rType, version, nonce, send } // reportLoad starts an LRS stream to report load data to the management server. // It blocks until the context is canceled. func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts resourceversion.LoadReportingOptions) { retries := 0 for { if ctx.Err() != nil { return } if retries != 0 { timer := time.NewTimer(t.backoff(retries)) select { case <-timer.C: case <-ctx.Done(): if !timer.Stop() { <-timer.C } return } } retries++ stream, err := t.vClient.NewLoadStatsStream(ctx, cc) if err != nil { t.logger.Warnf("lrs: failed to create stream: %v", err) continue } t.logger.Infof("lrs: created LRS stream") if err = t.vClient.SendFirstLoadStatsRequest(stream); err != nil { t.logger.Warnf("lrs: failed to send first request: %v", err) continue } clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) if err != nil { t.logger.Warnf("%v", err) continue } retries = 0 t.sendLoads(ctx, stream, opts.LoadStore, clusters, interval) } } func (t *Controller) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) { tick := time.NewTicker(interval) defer tick.Stop() for { select { case <-tick.C: case <-ctx.Done(): return } if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil { t.logger.Warnf("%v", err) return } } }