pkg/istio-agent/xds_proxy.go (699 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package istioagent import ( "context" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "math" "net" "net/http" "net/url" "os" "strings" "sync" "time" ) import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "go.uber.org/atomic" google_rpc "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" any "google.golang.org/protobuf/types/known/anypb" meshconfig "istio.io/api/mesh/v1alpha1" "istio.io/pkg/log" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/ready" "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" istiogrpc "github.com/apache/dubbo-go-pixiu/pilot/pkg/grpc" "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds" v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3" "github.com/apache/dubbo-go-pixiu/pkg/config/constants" dnsProto "github.com/apache/dubbo-go-pixiu/pkg/dns/proto" "github.com/apache/dubbo-go-pixiu/pkg/istio-agent/health" "github.com/apache/dubbo-go-pixiu/pkg/istio-agent/metrics" istiokeepalive "github.com/apache/dubbo-go-pixiu/pkg/keepalive" "github.com/apache/dubbo-go-pixiu/pkg/uds" "github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal" "github.com/apache/dubbo-go-pixiu/pkg/wasm" "github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient" "github.com/apache/dubbo-go-pixiu/security/pkg/pki/util" ) const ( defaultClientMaxReceiveMessageSize = math.MaxInt32 defaultInitialConnWindowSize = 1024 * 1024 // default gRPC InitialWindowSize defaultInitialWindowSize = 1024 * 1024 // default gRPC ConnWindowSize ) var connectionNumber = atomic.NewUint32(0) // ResponseHandler handles a XDS response in the agent. These will not be forwarded to Envoy. // Currently, all handlers function on a single resource per type, so the API only exposes one // resource. type ResponseHandler func(resp *any.Any) error // XDS Proxy proxies all XDS requests from envoy to istiod, in addition to allowing // subsystems inside the agent to also communicate with either istiod/envoy (eg dns, sds, etc). // The goal here is to consolidate all xds related connections to istiod/envoy into a // single tcp connection with multiple gRPC streams. // TODO: Right now, the workloadSDS server and gatewaySDS servers are still separate // connections. These need to be consolidated. // TODO: consolidate/use ADSC struct - a lot of duplication. type XdsProxy struct { stopChan chan struct{} clusterID string downstreamListener net.Listener downstreamGrpcServer *grpc.Server istiodAddress string istiodDialOptions []grpc.DialOption optsMutex sync.RWMutex handlers map[string]ResponseHandler healthChecker *health.WorkloadHealthChecker xdsHeaders map[string]string xdsUdsPath string proxyAddresses []string httpTapServer *http.Server tapMutex sync.RWMutex tapResponseChannel chan *discovery.DiscoveryResponse // connected stores the active gRPC stream. The proxy will only have 1 connection at a time connected *ProxyConnection initialRequest *discovery.DiscoveryRequest initialDeltaRequest *discovery.DeltaDiscoveryRequest connectedMutex sync.RWMutex // Wasm cache and ecds channel are used to replace wasm remote load with local file. wasmCache wasm.Cache // ecds version and nonce uses atomic only to prevent race in testing. // In reality there should not be race as istiod will only have one // in flight update for each type of resource. // TODO(bianpengyuan): this relies on the fact that istiod versions all ECDS resources // the same in a update response. This needs update to support per resource versioning, // in case istiod changes its behavior, or a different ECDS server is used. ecdsLastAckVersion atomic.String ecdsLastNonce atomic.String downstreamGrpcOptions []grpc.ServerOption istiodSAN string } var proxyLog = log.RegisterScope("xdsproxy", "XDS Proxy in Istio Agent", 0) const ( localHostIPv4 = "127.0.0.1" localHostIPv6 = "[::1]" ) func initXdsProxy(ia *Agent) (*XdsProxy, error) { var err error localHostAddr := localHostIPv4 if ia.cfg.IsIPv6 { localHostAddr = localHostIPv6 } var envoyProbe ready.Prober if !ia.cfg.DisableEnvoy { envoyProbe = &ready.Probe{ AdminPort: uint16(ia.proxyConfig.ProxyAdminPort), LocalHostAddr: localHostAddr, } } cache := wasm.NewLocalFileCache(constants.IstioDataDir, wasm.DefaultWasmModulePurgeInterval, wasm.DefaultWasmModuleExpiry, ia.cfg.WASMInsecureRegistries) proxy := &XdsProxy{ istiodAddress: ia.proxyConfig.DiscoveryAddress, istiodSAN: ia.cfg.IstiodSAN, clusterID: ia.secOpts.ClusterID, handlers: map[string]ResponseHandler{}, stopChan: make(chan struct{}), healthChecker: health.NewWorkloadHealthChecker(ia.proxyConfig.ReadinessProbe, envoyProbe, ia.cfg.ProxyIPAddresses, ia.cfg.IsIPv6), xdsHeaders: ia.cfg.XDSHeaders, xdsUdsPath: ia.cfg.XdsUdsPath, wasmCache: cache, proxyAddresses: ia.cfg.ProxyIPAddresses, downstreamGrpcOptions: ia.cfg.DownstreamGrpcOptions, } if ia.localDNSServer != nil { proxy.handlers[v3.NameTableType] = func(resp *any.Any) error { var nt dnsProto.NameTable if err := resp.UnmarshalTo(&nt); err != nil { log.Errorf("failed to unmarshal name table: %v", err) return err } ia.localDNSServer.UpdateLookupTable(&nt) return nil } } if ia.cfg.EnableDynamicProxyConfig && ia.secretCache != nil { proxy.handlers[v3.ProxyConfigType] = func(resp *any.Any) error { pc := &meshconfig.ProxyConfig{} if err := resp.UnmarshalTo(pc); err != nil { log.Errorf("failed to unmarshal proxy config: %v", err) return err } caCerts := pc.GetCaCertificatesPem() log.Debugf("received new certificates to add to mesh trust domain: %v", caCerts) trustBundle := []byte{} for _, cert := range caCerts { trustBundle = util.AppendCertByte(trustBundle, []byte(cert)) } return ia.secretCache.UpdateConfigTrustBundle(trustBundle) } } proxyLog.Infof("Initializing with upstream address %q and cluster %q", proxy.istiodAddress, proxy.clusterID) if err = proxy.initDownstreamServer(); err != nil { return nil, err } if err = proxy.InitIstiodDialOptions(ia); err != nil { return nil, err } go func() { if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil { log.Errorf("failed to accept downstream gRPC connection %v", err) } }() go proxy.healthChecker.PerformApplicationHealthCheck(func(healthEvent *health.ProbeEvent) { // Store the same response as Delta and SotW. Depending on how Envoy connects we will use one or the other. var req *discovery.DiscoveryRequest if healthEvent.Healthy { req = &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType} } else { req = &discovery.DiscoveryRequest{ TypeUrl: v3.HealthInfoType, ErrorDetail: &google_rpc.Status{ Code: int32(codes.Internal), Message: healthEvent.UnhealthyMessage, }, } } proxy.PersistRequest(req) var deltaReq *discovery.DeltaDiscoveryRequest if healthEvent.Healthy { deltaReq = &discovery.DeltaDiscoveryRequest{TypeUrl: v3.HealthInfoType} } else { deltaReq = &discovery.DeltaDiscoveryRequest{ TypeUrl: v3.HealthInfoType, ErrorDetail: &google_rpc.Status{ Code: int32(codes.Internal), Message: healthEvent.UnhealthyMessage, }, } } proxy.PersistDeltaRequest(deltaReq) }, proxy.stopChan) return proxy, nil } // PersistRequest sends a request to the currently connected proxy. Additionally, on any reconnection // to the upstream XDS request we will resend this request. func (p *XdsProxy) PersistRequest(req *discovery.DiscoveryRequest) { var ch chan *discovery.DiscoveryRequest var stop chan struct{} p.connectedMutex.Lock() if p.connected != nil { ch = p.connected.requestsChan stop = p.connected.stopChan } p.initialRequest = req p.connectedMutex.Unlock() // Immediately send if we are currently connect if ch != nil { select { case ch <- req: case <-stop: } } } func (p *XdsProxy) UnregisterStream(c *ProxyConnection) { p.connectedMutex.Lock() defer p.connectedMutex.Unlock() if p.connected != nil && p.connected == c { close(p.connected.stopChan) p.connected = nil } } func (p *XdsProxy) RegisterStream(c *ProxyConnection) { p.connectedMutex.Lock() defer p.connectedMutex.Unlock() if p.connected != nil { proxyLog.Warnf("registered overlapping stream; closing previous") close(p.connected.stopChan) } p.connected = c } type ProxyConnection struct { conID uint32 upstreamError chan error downstreamError chan error requestsChan chan *discovery.DiscoveryRequest responsesChan chan *discovery.DiscoveryResponse deltaRequestsChan chan *discovery.DeltaDiscoveryRequest deltaResponsesChan chan *discovery.DeltaDiscoveryResponse stopChan chan struct{} downstream adsStream upstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient downstreamDeltas discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer upstreamDeltas discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient } // sendRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not // block forever on func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) { select { case con.requestsChan <- req: case <-con.stopChan: } } type adsStream interface { Send(*discovery.DiscoveryResponse) error Recv() (*discovery.DiscoveryRequest, error) Context() context.Context } // Every time envoy makes a fresh connection to the agent, we reestablish a new connection to the upstream xds // This ensures that a new connection between istiod and agent doesn't end up consuming pending messages from envoy // as the new connection may not go to the same istiod. Vice versa case also applies. func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server") return p.handleStream(downstream) } func (p *XdsProxy) handleStream(downstream adsStream) error { con := &ProxyConnection{ conID: connectionNumber.Inc(), upstreamError: make(chan error, 2), // can be produced by recv and send downstreamError: make(chan error, 2), // can be produced by recv and send requestsChan: make(chan *discovery.DiscoveryRequest, 10), responsesChan: make(chan *discovery.DiscoveryResponse, 10), stopChan: make(chan struct{}), downstream: downstream, } p.RegisterStream(con) defer p.UnregisterStream(con) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() upstreamConn, err := p.buildUpstreamConn(ctx) if err != nil { proxyLog.Errorf("failed to connect to upstream %s: %v", p.istiodAddress, err) metrics.IstiodConnectionFailures.Increment() return err } defer upstreamConn.Close() xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn) ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID) for k, v := range p.xdsHeaders { ctx = metadata.AppendToOutgoingContext(ctx, k, v) } // We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection return p.HandleUpstream(ctx, con, xds) } func (p *XdsProxy) buildUpstreamConn(ctx context.Context) (*grpc.ClientConn, error) { p.optsMutex.RLock() opts := make([]grpc.DialOption, 0, len(p.istiodDialOptions)) opts = append(opts, p.istiodDialOptions...) p.optsMutex.RUnlock() return grpc.DialContext(ctx, p.istiodAddress, opts...) } func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error { upstream, err := xds.StreamAggregatedResources(ctx, grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)) if err != nil { // Envoy logs errors again, so no need to log beyond debug level proxyLog.Debugf("failed to create upstream grpc client: %v", err) // Increase metric when xds connection error, for example: forgot to restart ingressgateway or sidecar after changing root CA. metrics.IstiodConnectionErrors.Increment() return err } proxyLog.Infof("connected to upstream XDS server: %s", p.istiodAddress) defer proxyLog.Debugf("disconnected from XDS server: %s", p.istiodAddress) con.upstream = upstream // Handle upstream xds recv go func() { for { // from istiod resp, err := con.upstream.Recv() if err != nil { select { case con.upstreamError <- err: case <-con.stopChan: } return } select { case con.responsesChan <- resp: case <-con.stopChan: } } }() go p.handleUpstreamRequest(con) go p.handleUpstreamResponse(con) for { select { case err := <-con.upstreamError: // error from upstream Istiod. if istiogrpc.IsExpectedGRPCError(err) { proxyLog.Debugf("upstream [%d] terminated with status %v", con.conID, err) metrics.IstiodConnectionCancellations.Increment() } else { proxyLog.Warnf("upstream [%d] terminated with unexpected error %v", con.conID, err) metrics.IstiodConnectionErrors.Increment() } return err case err := <-con.downstreamError: // error from downstream Envoy. if istiogrpc.IsExpectedGRPCError(err) { proxyLog.Debugf("downstream [%d] terminated with status %v", con.conID, err) metrics.EnvoyConnectionCancellations.Increment() } else { proxyLog.Warnf("downstream [%d] terminated with unexpected error %v", con.conID, err) metrics.EnvoyConnectionErrors.Increment() } // On downstream error, we will return. This propagates the error to downstream envoy which will trigger reconnect return err case <-con.stopChan: proxyLog.Debugf("stream stopped") return nil } } } func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) { initialRequestsSent := atomic.NewBool(false) go func() { for { // recv xds requests from envoy req, err := con.downstream.Recv() if err != nil { select { case con.downstreamError <- err: case <-con.stopChan: } return } // forward to istiod con.sendRequest(req) if !initialRequestsSent.Load() && req.TypeUrl == v3.ListenerType { // fire off an initial NDS request if _, f := p.handlers[v3.NameTableType]; f { con.sendRequest(&discovery.DiscoveryRequest{ TypeUrl: v3.NameTableType, }) } // fire off an initial PCDS request if _, f := p.handlers[v3.ProxyConfigType]; f { con.sendRequest(&discovery.DiscoveryRequest{ TypeUrl: v3.ProxyConfigType, }) } // set flag before sending the initial request to prevent race. initialRequestsSent.Store(true) // Fire of a configured initial request, if there is one p.connectedMutex.RLock() initialRequest := p.initialRequest if initialRequest != nil { con.sendRequest(initialRequest) } p.connectedMutex.RUnlock() } } }() defer con.upstream.CloseSend() // nolint for { select { case req := <-con.requestsChan: if req.TypeUrl == v3.HealthInfoType && !initialRequestsSent.Load() { // only send healthcheck probe after LDS request has been sent continue } proxyLog.Debugf("request for type url %s", req.TypeUrl) metrics.XdsProxyRequests.Increment() if req.TypeUrl == v3.ExtensionConfigurationType { if req.VersionInfo != "" { p.ecdsLastAckVersion.Store(req.VersionInfo) } p.ecdsLastNonce.Store(req.ResponseNonce) } if err := sendUpstream(con.upstream, req); err != nil { proxyLog.Errorf("upstream [%d] send error for type url %s: %v", con.conID, req.TypeUrl, err) con.upstreamError <- err return } case <-con.stopChan: return } } } func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) { forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1) for { select { case resp := <-con.responsesChan: // TODO: separate upstream response handling from requests sending, which are both time costly proxyLog.Debugf("response for type url %s", resp.TypeUrl) metrics.XdsProxyResponses.Increment() if h, f := p.handlers[resp.TypeUrl]; f { if len(resp.Resources) == 0 { // Empty response, nothing to do // This assumes internal types are always singleton break } err := h(resp.Resources[0]) var errorResp *google_rpc.Status if err != nil { errorResp = &google_rpc.Status{ Code: int32(codes.Internal), Message: err.Error(), } } // Send ACK/NACK con.sendRequest(&discovery.DiscoveryRequest{ VersionInfo: resp.VersionInfo, TypeUrl: resp.TypeUrl, ResponseNonce: resp.Nonce, ErrorDetail: errorResp, }) continue } switch resp.TypeUrl { case v3.ExtensionConfigurationType: if features.WasmRemoteLoadConversion { // If Wasm remote load conversion feature is enabled, rewrite and send. go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) { // Forward the response using the thread of `handleUpstreamResponse` // to prevent concurrent access to forwardToEnvoy select { case forwardEnvoyCh <- resp: case <-con.stopChan: } }) } else { // Otherwise, forward ECDS resource update directly to Envoy. forwardToEnvoy(con, resp) } default: if strings.HasPrefix(resp.TypeUrl, "istio.io/debug") { p.forwardToTap(resp) } else { forwardToEnvoy(con, resp) } } case resp := <-forwardEnvoyCh: forwardToEnvoy(con, resp) case <-con.stopChan: return } } } func (p *XdsProxy) rewriteAndForward(con *ProxyConnection, resp *discovery.DiscoveryResponse, forward func(resp *discovery.DiscoveryResponse)) { sendNack := wasm.MaybeConvertWasmExtensionConfig(resp.Resources, p.wasmCache) if sendNack { proxyLog.Debugf("sending NACK for ECDS resources %+v", resp.Resources) con.sendRequest(&discovery.DiscoveryRequest{ VersionInfo: p.ecdsLastAckVersion.Load(), TypeUrl: v3.ExtensionConfigurationType, ResponseNonce: resp.Nonce, ErrorDetail: &google_rpc.Status{ // TODO(bianpengyuan): make error message more informative. Message: "failed to fetch wasm module", }, }) return } proxyLog.Debugf("forward ECDS resources %+v", resp.Resources) forward(resp) } func (p *XdsProxy) forwardToTap(resp *discovery.DiscoveryResponse) { select { case p.tapResponseChannel <- resp: default: log.Infof("tap response %q arrived too late; discarding", resp.TypeUrl) } } func forwardToEnvoy(con *ProxyConnection, resp *discovery.DiscoveryResponse) { if !v3.IsEnvoyType(resp.TypeUrl) { proxyLog.Errorf("Skipping forwarding type url %s to Envoy as is not a valid Envoy type", resp.TypeUrl) return } if err := sendDownstream(con.downstream, resp); err != nil { select { case con.downstreamError <- err: // we cannot return partial error and hope to restart just the downstream // as we are blindly proxying req/responses. For now, the best course of action // is to terminate upstream connection as well and restart afresh. proxyLog.Errorf("downstream [%d] send error: %v", con.conID, err) default: // Do not block on downstream error channel push, this could happen when forward // is triggered from a separated goroutine (e.g. ECDS processing go routine) while // downstream connection has already been teared down and no receiver is available // for downstream error channel. proxyLog.Debugf("downstream [%d] error channel full, but get downstream send error: %v", con.conID, err) } return } } func (p *XdsProxy) close() { close(p.stopChan) p.wasmCache.Cleanup() if p.httpTapServer != nil { _ = p.httpTapServer.Close() } if p.downstreamGrpcServer != nil { p.downstreamGrpcServer.Stop() } if p.downstreamListener != nil { _ = p.downstreamListener.Close() } } func (p *XdsProxy) initDownstreamServer() error { l, err := uds.NewListener(p.xdsUdsPath) if err != nil { return err } // TODO: Expose keepalive options to agent cmd line flags. opts := p.downstreamGrpcOptions opts = append(opts, istiogrpc.ServerOptions(istiokeepalive.DefaultOption())...) grpcs := grpc.NewServer(opts...) discovery.RegisterAggregatedDiscoveryServiceServer(grpcs, p) reflection.Register(grpcs) p.downstreamGrpcServer = grpcs p.downstreamListener = l return nil } func (p *XdsProxy) InitIstiodDialOptions(agent *Agent) error { opts, err := p.buildUpstreamClientDialOpts(agent) if err != nil { return err } p.optsMutex.Lock() p.istiodDialOptions = opts p.optsMutex.Unlock() return nil } func (p *XdsProxy) buildUpstreamClientDialOpts(sa *Agent) ([]grpc.DialOption, error) { tlsOpts, err := p.getTLSDialOption(sa) if err != nil { return nil, fmt.Errorf("failed to build TLS dial option to talk to upstream: %v", err) } keepaliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, Timeout: 10 * time.Second, }) initialWindowSizeOption := grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)) initialConnWindowSizeOption := grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)) msgSizeOption := grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)) // Make sure the dial is blocking as we don't want any other operation to resume until the // connection to upstream has been made. dialOptions := []grpc.DialOption{ tlsOpts, keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption, msgSizeOption, } dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(caclient.NewXDSTokenProvider(sa.secOpts))) return dialOptions, nil } // Returns the TLS option to use when talking to Istiod // If provisioned cert is set, it will return a mTLS related config // Else it will return a one-way TLS related config with the assumption // that the consumer code will use tokens to authenticate the upstream. func (p *XdsProxy) getTLSDialOption(agent *Agent) (grpc.DialOption, error) { if agent.proxyConfig.ControlPlaneAuthPolicy == meshconfig.AuthenticationPolicy_NONE { return grpc.WithTransportCredentials(insecure.NewCredentials()), nil } rootCert, err := p.getRootCertificate(agent) if err != nil { return nil, err } config := tls.Config{ GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { var certificate tls.Certificate key, cert := agent.GetKeyCertsForXDS() if key != "" && cert != "" { // Load the certificate from disk certificate, err = tls.LoadX509KeyPair(cert, key) if err != nil { return nil, err } } return &certificate, nil }, RootCAs: rootCert, } // strip the port from the address parts := strings.Split(agent.proxyConfig.DiscoveryAddress, ":") config.ServerName = parts[0] // For debugging on localhost (with port forward) // This matches the logic for the CA; this code should eventually be shared if strings.Contains(config.ServerName, "localhost") { config.ServerName = "istiod.dubbo-system.svc" } if p.istiodSAN != "" { config.ServerName = p.istiodSAN } // TODO: if istiodSAN starts with spiffe://, use custom validation. config.MinVersion = tls.VersionTLS12 transportCreds := credentials.NewTLS(&config) return grpc.WithTransportCredentials(transportCreds), nil } func (p *XdsProxy) getRootCertificate(agent *Agent) (*x509.CertPool, error) { var certPool *x509.CertPool var rootCert []byte xdsCACertPath, err := agent.FindRootCAForXDS() if err != nil { return nil, fmt.Errorf("failed to find root CA cert for XDS: %v", err) } if xdsCACertPath != "" { rootCert, err = os.ReadFile(xdsCACertPath) if err != nil { return nil, err } certPool = x509.NewCertPool() ok := certPool.AppendCertsFromPEM(rootCert) if !ok { return nil, fmt.Errorf("failed to create TLS dial option with root certificates") } } else { certPool, err = x509.SystemCertPool() if err != nil { return nil, err } } return certPool, nil } // sendUpstream sends discovery request. func sendUpstream(upstream xds.DiscoveryClient, request *discovery.DiscoveryRequest) error { return istiogrpc.Send(upstream.Context(), func() error { return upstream.Send(request) }) } // sendDownstream sends discovery response. func sendDownstream(downstream adsStream, response *discovery.DiscoveryResponse) error { return istiogrpc.Send(downstream.Context(), func() error { return downstream.Send(response) }) } // tapRequest() sends "req" to Istiod, and returns a matching response, or `nil` on timeout. // Requests are serialized -- only one may be in-flight at a time. func (p *XdsProxy) tapRequest(req *discovery.DiscoveryRequest, timeout time.Duration) (*discovery.DiscoveryResponse, error) { if p.connected == nil { return nil, fmt.Errorf("proxy not connected to Istiod") } // Only allow one tap request at a time p.tapMutex.Lock() defer p.tapMutex.Unlock() // Send to Istiod p.connected.sendRequest(req) // Wait for expected response or timeout for { select { case res := <-p.tapResponseChannel: if res.TypeUrl == req.TypeUrl { return res, nil } case <-time.After(timeout): return nil, nil } } } func (p *XdsProxy) makeTapHandler() func(w http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { qp, err := url.ParseQuery(req.URL.RawQuery) if err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, "%v\n", err) return } typeURL := fmt.Sprintf("istio.io%s", req.URL.Path) dr := discovery.DiscoveryRequest{ TypeUrl: typeURL, } resourceName := qp.Get("resourceName") if resourceName != "" { dr.ResourceNames = []string{resourceName} } response, err := p.tapRequest(&dr, 5*time.Second) if err != nil { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "%v\n", err) return } if response == nil { log.Infof("timed out waiting for Istiod to respond to %q", typeURL) w.WriteHeader(http.StatusGatewayTimeout) return } // Try to unmarshal Istiod's response using protojson (needed for Envoy protobufs) w.Header().Add("Content-Type", "application/json") b, err := protomarshal.MarshalIndent(response, " ") if err == nil { _, err = w.Write(b) if err != nil { log.Infof("fail to write debug response: %v", err) } return } // Failed as protobuf. Try as regular JSON proxyLog.Warnf("could not marshal istiod response as pb: %v", err) j, err := json.Marshal(response) if err != nil { // Couldn't unmarshal at all w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, "%v\n", err) return } _, err = w.Write(j) if err != nil { log.Infof("fail to write debug response: %v", err) return } } } // initDebugInterface() listens on localhost:${PORT} for path /debug/... // forwards the paths to Istiod as xDS requests // waits for response from Istiod, sends it as JSON func (p *XdsProxy) initDebugInterface(port int) error { p.tapResponseChannel = make(chan *discovery.DiscoveryResponse) httpMux := http.NewServeMux() handler := p.makeTapHandler() httpMux.HandleFunc("/debug/", handler) httpMux.HandleFunc("/debug", handler) // For 1.10 Istiod which uses istio.io/debug p.httpTapServer = &http.Server{ Addr: fmt.Sprintf("localhost:%d", port), Handler: httpMux, IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout ReadTimeout: 30 * time.Second, } // create HTTP listener listener, err := net.Listen("tcp", p.httpTapServer.Addr) if err != nil { return err } go func() { log.Infof("starting Http service at %s", listener.Addr()) if err := p.httpTapServer.Serve(listener); err != nil { log.Errorf("error serving tap http server: %v", err) } }() return nil }