pilot/cmd/pilot-agent/status/server.go (675 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package status import ( "bytes" "context" "crypto/tls" "encoding/json" "errors" "fmt" "io" "mime" "net" "net/http" "net/http/pprof" "os" "regexp" "strconv" "strings" "sync" "syscall" "time" ) import ( ocprom "contrib.go.opencensus.io/exporter/prometheus" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/common/expfmt" "go.opencensus.io/stats/view" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" grpcHealth "google.golang.org/grpc/health/grpc_health_v1" grpcStatus "google.golang.org/grpc/status" "istio.io/pkg/env" "istio.io/pkg/log" "k8s.io/apimachinery/pkg/util/intstr" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/metrics" "github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/grpcready" "github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/ready" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/config" dnsProto "github.com/apache/dubbo-go-pixiu/pkg/dns/proto" "github.com/apache/dubbo-go-pixiu/pkg/kube/apimirror" ) const ( // readyPath is for the pilot agent readiness itself. readyPath = "/healthz/ready" // quitPath is to notify the pilot agent to quit. quitPath = "/quitquitquit" // KubeAppProberEnvName is the name of the command line flag for pilot agent to pass app prober config. // The json encoded string to pass app HTTP probe information from injector(istioctl or webhook). // For example, ISTIO_KUBE_APP_PROBERS='{"/app-health/httpbin/livez":{"httpGet":{"path": "/hello", "port": 8080}}. // indicates that httpbin container liveness prober port is 8080 and probing path is /hello. // This environment variable should never be set manually. KubeAppProberEnvName = "ISTIO_KUBE_APP_PROBERS" localHostIPv4 = "127.0.0.1" localHostIPv6 = "[::1]" ) var ( UpstreamLocalAddressIPv4 = &net.TCPAddr{IP: net.ParseIP("127.0.0.6")} UpstreamLocalAddressIPv6 = &net.TCPAddr{IP: net.ParseIP("::6")} ) var PrometheusScrapingConfig = env.RegisterStringVar("ISTIO_PROMETHEUS_ANNOTATIONS", "", "") var ( appProberPattern = regexp.MustCompile(`^/app-health/[^/]+/(livez|readyz|startupz)$`) promRegistry *prometheus.Registry LegacyLocalhostProbeDestination = env.RegisterBoolVar("REWRITE_PROBE_LEGACY_LOCALHOST_DESTINATION", false, "If enabled, readiness probes will be sent to 'localhost'. Otherwise, they will be sent to the Pod's IP, matching Kubernetes' behavior.") ProbeKeepaliveConnections = env.RegisterBoolVar("ENABLE_PROBE_KEEPALIVE_CONNECTIONS", false, "If enabled, readiness probes will keep the connection from pilot-agent to the application alive. "+ "This mirrors older Istio versions' behaviors, but not kubelet's.").Get() ) // KubeAppProbers holds the information about a Kubernetes pod prober. // It's a map from the prober URL path to the Kubernetes Prober config. // For example, "/app-health/hello-world/livez" entry contains liveness prober config for // container "hello-world". type KubeAppProbers map[string]*Prober // Prober represents a single container prober type Prober struct { HTTPGet *apimirror.HTTPGetAction `json:"httpGet,omitempty"` TCPSocket *apimirror.TCPSocketAction `json:"tcpSocket,omitempty"` GRPC *apimirror.GRPCAction `json:"grpc,omitempty"` TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"` } // Options for the status server. type Options struct { // Ip of the pod. Note: this is only applicable for Kubernetes pods and should only be used for // the prober. PodIP string // KubeAppProbers is a json with Kubernetes application prober config encoded. KubeAppProbers string NodeType model.NodeType StatusPort uint16 AdminPort uint16 IPv6 bool Probes []ready.Prober EnvoyPrometheusPort int Context context.Context FetchDNS func() *dnsProto.NameTable NoEnvoy bool GRPCBootstrap string } // Server provides an endpoint for handling status probes. type Server struct { ready []ready.Prober prometheus *PrometheusScrapeConfiguration mutex sync.RWMutex appProbersDestination string appKubeProbers KubeAppProbers appProbeClient map[string]*http.Client statusPort uint16 lastProbeSuccessful bool envoyStatsPort int fetchDNS func() *dnsProto.NameTable upstreamLocalAddress *net.TCPAddr config Options } func init() { registry := prometheus.NewRegistry() wrapped := prometheus.WrapRegistererWithPrefix("istio_agent_", prometheus.Registerer(registry)) wrapped.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) wrapped.MustRegister(collectors.NewGoCollector()) promRegistry = registry // go collector metrics collide with other metrics. exporter, err := ocprom.NewExporter(ocprom.Options{Registry: registry, Registerer: wrapped}) if err != nil { log.Fatalf("could not setup exporter: %v", err) } view.RegisterExporter(exporter) } // NewServer creates a new status server. func NewServer(config Options) (*Server, error) { localhost := localHostIPv4 upstreamLocalAddress := UpstreamLocalAddressIPv4 if config.IPv6 { localhost = localHostIPv6 upstreamLocalAddress = UpstreamLocalAddressIPv6 } probes := make([]ready.Prober, 0) if !config.NoEnvoy { probes = append(probes, &ready.Probe{ LocalHostAddr: localhost, AdminPort: config.AdminPort, Context: config.Context, NoEnvoy: config.NoEnvoy, }) } if config.GRPCBootstrap != "" { probes = append(probes, grpcready.NewProbe(config.GRPCBootstrap)) } probes = append(probes, config.Probes...) s := &Server{ statusPort: config.StatusPort, ready: probes, appProbersDestination: wrapIPv6(config.PodIP), envoyStatsPort: config.EnvoyPrometheusPort, fetchDNS: config.FetchDNS, upstreamLocalAddress: upstreamLocalAddress, config: config, } if LegacyLocalhostProbeDestination.Get() { s.appProbersDestination = "localhost" } // Enable prometheus server if its configured and a sidecar // Because port 15020 is exposed in the gateway Services, we cannot safely serve this endpoint // If we need to do this in the future, we should use envoy to do routing or have another port to make this internal // only. For now, its not needed for gateway, as we can just get Envoy stats directly, but if we // want to expose istio-agent metrics we may want to revisit this. if cfg, f := PrometheusScrapingConfig.Lookup(); config.NodeType == model.SidecarProxy && f { var prom PrometheusScrapeConfiguration if err := json.Unmarshal([]byte(cfg), &prom); err != nil { return nil, fmt.Errorf("failed to unmarshal %s: %v", PrometheusScrapingConfig.Name, err) } log.Infof("Prometheus scraping configuration: %v", prom) if prom.Scrape != "false" { s.prometheus = &prom if s.prometheus.Path == "" { s.prometheus.Path = "/metrics" } if s.prometheus.Port == "" { s.prometheus.Port = "80" } if s.prometheus.Port == strconv.Itoa(int(config.StatusPort)) { return nil, fmt.Errorf("invalid prometheus scrape configuration: "+ "application port is the same as agent port, which may lead to a recursive loop. "+ "Ensure pod does not have prometheus.io/port=%d label, or that injection is not happening multiple times", config.StatusPort) } } } if config.KubeAppProbers == "" { return s, nil } if err := json.Unmarshal([]byte(config.KubeAppProbers), &s.appKubeProbers); err != nil { return nil, fmt.Errorf("failed to decode app prober err = %v, json string = %v", err, config.KubeAppProbers) } s.appProbeClient = make(map[string]*http.Client, len(s.appKubeProbers)) // Validate the map key matching the regex pattern. for path, prober := range s.appKubeProbers { err := validateAppKubeProber(path, prober) if err != nil { return nil, err } if prober.HTTPGet != nil { d := &net.Dialer{ LocalAddr: s.upstreamLocalAddress, } // Construct a http client and cache it in order to reuse the connection. s.appProbeClient[path] = &http.Client{ Timeout: time.Duration(prober.TimeoutSeconds) * time.Second, // We skip the verification since kubelet skips the verification for HTTPS prober as well // https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, DialContext: d.DialContext, // https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L55 // Match Kubernetes logic. This also ensures idle timeouts do not trigger probe failures DisableKeepAlives: !ProbeKeepaliveConnections, }, CheckRedirect: redirectChecker(), } } } return s, nil } // Copies logic from https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L129-L130 func isRedirect(code int) bool { return code >= http.StatusMultipleChoices && code < http.StatusBadRequest } // Using the same redirect logic that kubelet does: https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L141 // This means that: // * If we exceed 10 redirects, the probe fails // * If we redirect somewhere external, the probe succeeds (https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L130) // * If we redirect to the same address, the probe will follow the redirect func redirectChecker() func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { if req.URL.Hostname() != via[0].URL.Hostname() { return http.ErrUseLastResponse } // Default behavior: stop after 10 redirects. if len(via) >= 10 { return errors.New("stopped after 10 redirects") } return nil } } func validateAppKubeProber(path string, prober *Prober) error { if !appProberPattern.MatchString(path) { return fmt.Errorf(`invalid path, must be in form of regex pattern %v`, appProberPattern) } count := 0 if prober.HTTPGet != nil { count++ } if prober.TCPSocket != nil { count++ } if prober.GRPC != nil { count++ } if count != 1 { return fmt.Errorf(`invalid prober type, must be one of type httpGet, tcpSocket or gRPC`) } if prober.HTTPGet != nil && prober.HTTPGet.Port.Type != intstr.Int { return fmt.Errorf("invalid prober config for %v, the port must be int type", path) } if prober.TCPSocket != nil && prober.TCPSocket.Port.Type != intstr.Int { return fmt.Errorf("invalid prober config for %v, the port must be int type", path) } return nil } // FormatProberURL returns a set of HTTP URLs that pilot agent will serve to take over Kubernetes // app probers. func FormatProberURL(container string) (string, string, string) { return fmt.Sprintf("/app-health/%v/readyz", container), fmt.Sprintf("/app-health/%v/livez", container), fmt.Sprintf("/app-health/%v/startupz", container) } // Run opens a the status port and begins accepting probes. func (s *Server) Run(ctx context.Context) { log.Infof("Opening status port %d", s.statusPort) mux := http.NewServeMux() // Add the handler for ready probes. mux.HandleFunc(readyPath, s.handleReadyProbe) // Default path for prom mux.HandleFunc(`/metrics`, s.handleStats) // Envoy uses something else - and original agent used the same. // Keep for backward compat with configs. mux.HandleFunc(`/stats/prometheus`, s.handleStats) mux.HandleFunc(quitPath, s.handleQuit) mux.HandleFunc("/app-health/", s.handleAppProbe) // Add the handler for pprof. mux.HandleFunc("/debug/pprof/", s.handlePprofIndex) mux.HandleFunc("/debug/pprof/cmdline", s.handlePprofCmdline) mux.HandleFunc("/debug/pprof/profile", s.handlePprofProfile) mux.HandleFunc("/debug/pprof/symbol", s.handlePprofSymbol) mux.HandleFunc("/debug/pprof/trace", s.handlePprofTrace) mux.HandleFunc("/debug/ndsz", s.handleNdsz) l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort)) if err != nil { log.Errorf("Error listening on status port: %v", err.Error()) return } // for testing. if s.statusPort == 0 { addrs := strings.Split(l.Addr().String(), ":") allocatedPort, _ := strconv.Atoi(addrs[len(addrs)-1]) s.mutex.Lock() s.statusPort = uint16(allocatedPort) s.mutex.Unlock() } defer l.Close() go func() { if err := http.Serve(l, mux); err != nil { log.Error(err) select { case <-ctx.Done(): // We are shutting down already, don't trigger SIGTERM return default: // If the server errors then pilot-agent can never pass readiness or liveness probes // Therefore, trigger graceful termination by sending SIGTERM to the binary pid notifyExit() } } }() // Wait for the agent to be shut down. <-ctx.Done() log.Info("Status server has successfully terminated") } func (s *Server) handlePprofIndex(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } pprof.Index(w, r) } func (s *Server) handlePprofCmdline(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } pprof.Cmdline(w, r) } func (s *Server) handlePprofSymbol(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } pprof.Symbol(w, r) } func (s *Server) handlePprofProfile(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } pprof.Profile(w, r) } func (s *Server) handlePprofTrace(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } pprof.Trace(w, r) } func (s *Server) handleReadyProbe(w http.ResponseWriter, _ *http.Request) { err := s.isReady() s.mutex.Lock() if err != nil { w.WriteHeader(http.StatusServiceUnavailable) log.Warnf("Envoy proxy is NOT ready: %s", err.Error()) s.lastProbeSuccessful = false } else { w.WriteHeader(http.StatusOK) if !s.lastProbeSuccessful { log.Info("Envoy proxy is ready") } s.lastProbeSuccessful = true } s.mutex.Unlock() } func (s *Server) isReady() error { for _, p := range s.ready { if err := p.Check(); err != nil { return err } } return nil } func isRequestFromLocalhost(r *http.Request) bool { ip, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { return false } userIP := net.ParseIP(ip) return userIP.IsLoopback() } type PrometheusScrapeConfiguration struct { Scrape string `json:"scrape"` Path string `json:"path"` Port string `json:"port"` } // handleStats handles prometheus stats scraping. This will scrape envoy metrics, and, if configured, // the application metrics and merge them together. // The merge here is a simple string concatenation. This works for almost all cases, assuming the application // is not exposing the same metrics as Envoy. // This merging works for both FmtText and FmtOpenMetrics and will use the format of the application metrics // Note that we do not return any errors here. If we do, we will drop metrics. For example, the app may be having issues, // but we still want Envoy metrics. Instead, errors are tracked in the failed scrape metrics/logs. func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { metrics.ScrapeTotals.Increment() var envoy, application, agent []byte var err error // Gather all the metrics we will merge if !s.config.NoEnvoy { if envoy, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil { log.Errorf("failed scraping envoy metrics: %v", err) metrics.EnvoyScrapeErrors.Increment() } // Process envoy's metrics to make them compatible with FmtOpenMetrics envoy = processMetrics(envoy) } // Scrape app metrics if defined and capture their format var format expfmt.Format if s.prometheus != nil { var contentType string url := fmt.Sprintf("http://localhost:%s%s", s.prometheus.Port, s.prometheus.Path) if application, contentType, err = s.scrape(url, r.Header); err != nil { log.Errorf("failed scraping application metrics: %v", err) metrics.AppScrapeErrors.Increment() } format = negotiateMetricsFormat(contentType) } else { // Without app metrics format use a default format = expfmt.FmtText } if agent, err = scrapeAgentMetrics(); err != nil { log.Errorf("failed scraping agent metrics: %v", err) metrics.AgentScrapeErrors.Increment() } w.Header().Set("Content-Type", string(format)) // Write out the metrics if _, err := w.Write(agent); err != nil { log.Errorf("failed to write agent metrics: %v", err) metrics.AgentScrapeErrors.Increment() } if envoy != nil { if _, err := w.Write(envoy); err != nil { log.Errorf("failed to write envoy metrics: %v", err) metrics.EnvoyScrapeErrors.Increment() } } // App metrics must go last because if they are FmtOpenMetrics, // they will have a trailing "# EOF" which terminates the full exposition if _, err := w.Write(application); err != nil { log.Errorf("failed to write application metrics: %v", err) metrics.AppScrapeErrors.Increment() } } func negotiateMetricsFormat(contentType string) expfmt.Format { mediaType, _, err := mime.ParseMediaType(contentType) if err == nil && mediaType == expfmt.OpenMetricsType { return expfmt.FmtOpenMetrics } return expfmt.FmtText } func processMetrics(metrics []byte) []byte { return bytes.ReplaceAll(metrics, []byte("\n\n"), []byte("\n")) } func scrapeAgentMetrics() ([]byte, error) { buf := &bytes.Buffer{} mfs, err := promRegistry.Gather() enc := expfmt.NewEncoder(buf, expfmt.FmtText) if err != nil { return nil, err } var errs error for _, mf := range mfs { if err := enc.Encode(mf); err != nil { errs = multierror.Append(errs, err) } } return buf.Bytes(), errs } func applyHeaders(into http.Header, from http.Header, keys ...string) { for _, key := range keys { val := from.Get(key) if val != "" { into.Set(key, val) } } } // getHeaderTimeout parse a string like (1.234) representing number of seconds func getHeaderTimeout(timeout string) (time.Duration, error) { timeoutSeconds, err := strconv.ParseFloat(timeout, 64) if err != nil { return 0 * time.Second, err } return time.Duration(timeoutSeconds * 1e9), nil } // scrape will send a request to the provided url to scrape metrics from // This will attempt to mimic some of Prometheus functionality by passing some of the headers through // such as accept, timeout, and user agent // Returns the scraped metrics as well as the response's "Content-Type" header to determine the metrics format func (s *Server) scrape(url string, header http.Header) ([]byte, string, error) { ctx := context.Background() if timeoutString := header.Get("X-Prometheus-Scrape-Timeout-Seconds"); timeoutString != "" { timeout, err := getHeaderTimeout(timeoutString) if err != nil { log.Warnf("Failed to parse timeout header %v: %v", timeoutString, err) } else { c, cancel := context.WithTimeout(ctx, timeout) ctx = c defer cancel() } } req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, "", err } applyHeaders(req.Header, header, "Accept", "User-Agent", "X-Prometheus-Scrape-Timeout-Seconds", ) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, "", fmt.Errorf("error scraping %s: %v", url, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode) } metrics, err := io.ReadAll(resp.Body) if err != nil { return nil, "", fmt.Errorf("error reading %s: %v", url, err) } format := resp.Header.Get("Content-Type") return metrics, format, nil } func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } if r.Method != http.MethodPost { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return } w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("OK")) log.Infof("handling %s, notifying pilot-agent to exit", quitPath) notifyExit() } func (s *Server) handleAppProbe(w http.ResponseWriter, req *http.Request) { // Validate the request first. path := req.URL.Path if !strings.HasPrefix(path, "/") { path = "/" + req.URL.Path } prober, exists := s.appKubeProbers[path] if !exists { log.Errorf("Prober does not exists url %v", path) w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(fmt.Sprintf("app prober config does not exists for %v", path))) return } switch { case prober.HTTPGet != nil: s.handleAppProbeHTTPGet(w, req, prober, path) case prober.TCPSocket != nil: s.handleAppProbeTCPSocket(w, prober) case prober.GRPC != nil: s.handleAppProbeGRPC(w, req, prober) } } func (s *Server) handleAppProbeHTTPGet(w http.ResponseWriter, req *http.Request, prober *Prober, path string) { proberPath := prober.HTTPGet.Path if !strings.HasPrefix(proberPath, "/") { proberPath = "/" + proberPath } var url string if prober.HTTPGet.Scheme == apimirror.URISchemeHTTPS { url = fmt.Sprintf("https://%s:%v%s", s.appProbersDestination, prober.HTTPGet.Port.IntValue(), proberPath) } else { url = fmt.Sprintf("http://%s:%v%s", s.appProbersDestination, prober.HTTPGet.Port.IntValue(), proberPath) } appReq, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { log.Errorf("Failed to create request to probe app %v, original url %v", err, path) w.WriteHeader(http.StatusInternalServerError) return } // Forward incoming headers to the application. for name, values := range req.Header { newValues := make([]string, len(values)) copy(newValues, values) appReq.Header[name] = newValues } // If there are custom HTTPHeaders, it will override the forwarding header if headers := prober.HTTPGet.HTTPHeaders; len(headers) != 0 { for _, h := range headers { delete(appReq.Header, h.Name) } for _, h := range headers { if h.Name == "Host" || h.Name == ":authority" { // Probe has specific host header override; honor it appReq.Host = h.Value appReq.Header.Set(h.Name, h.Value) } else { appReq.Header.Add(h.Name, h.Value) } } } // get the http client must exist because httpClient := s.appProbeClient[path] // Send the request. response, err := httpClient.Do(appReq) if err != nil { log.Errorf("Request to probe app failed: %v, original URL path = %v\napp URL path = %v", err, path, proberPath) w.WriteHeader(http.StatusInternalServerError) return } defer func() { // Drain and close the body to let the Transport reuse the connection _, _ = io.Copy(io.Discard, response.Body) _ = response.Body.Close() }() if isRedirect(response.StatusCode) { // Redirect // In other cases, we return the original status code. For redirects, it is illegal to // not have Location header, so we need to switch to just 200. w.WriteHeader(http.StatusOK) return } // We only write the status code to the response. w.WriteHeader(response.StatusCode) } func (s *Server) handleAppProbeTCPSocket(w http.ResponseWriter, prober *Prober) { port := prober.TCPSocket.Port.IntValue() timeout := time.Duration(prober.TimeoutSeconds) * time.Second d := &net.Dialer{ LocalAddr: s.upstreamLocalAddress, Timeout: timeout, } conn, err := d.Dial("tcp", fmt.Sprintf("%s:%d", s.appProbersDestination, port)) if err != nil { w.WriteHeader(http.StatusInternalServerError) } else { w.WriteHeader(http.StatusOK) conn.Close() } } func (s *Server) handleAppProbeGRPC(w http.ResponseWriter, req *http.Request, prober *Prober) { timeout := time.Duration(prober.TimeoutSeconds) * time.Second // the DialOptions are referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L55-L59 opts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), // credentials are currently not supported grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { d := net.Dialer{ LocalAddr: s.upstreamLocalAddress, Timeout: timeout, } return d.DialContext(ctx, "tcp", addr) }), } if userAgent := req.Header["User-Agent"]; len(userAgent) > 0 { // simulate kubelet // please refer to: // https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L56 // https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/http/http.go#L103 opts = append(opts, grpc.WithUserAgent(userAgent[0])) } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() addr := fmt.Sprintf("%s:%d", s.appProbersDestination, prober.GRPC.Port) conn, err := grpc.DialContext(ctx, addr, opts...) if err != nil { log.Errorf("Failed to create grpc connection to probe app: %v", err) w.WriteHeader(http.StatusInternalServerError) return } defer conn.Close() var svc string if prober.GRPC.Service != nil { svc = *prober.GRPC.Service } grpcClient := grpcHealth.NewHealthClient(conn) resp, err := grpcClient.Check(ctx, &grpcHealth.HealthCheckRequest{ Service: svc, }) // the error handling is referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L88-L106 if err != nil { status, ok := grpcStatus.FromError(err) if ok { switch status.Code() { case codes.Unimplemented: log.Errorf("server does not implement the grpc health protocol (grpc.health.v1.Health): %v", err) case codes.DeadlineExceeded: log.Errorf("grpc request not finished within timeout: %v", err) default: log.Errorf("grpc probe failed: %v", err) } } else { log.Errorf("grpc probe failed: %v", err) } w.WriteHeader(http.StatusInternalServerError) return } if resp.GetStatus() == grpcHealth.HealthCheckResponse_SERVING { w.WriteHeader(http.StatusOK) return } w.WriteHeader(http.StatusInternalServerError) } func (s *Server) handleNdsz(w http.ResponseWriter, r *http.Request) { if !isRequestFromLocalhost(r) { http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden) return } nametable := s.fetchDNS() if nametable == nil { // See https://golang.org/doc/faq#nil_error for why writeJSONProto cannot handle this w.WriteHeader(http.StatusNotFound) _, _ = w.Write([]byte(`{}`)) return } writeJSONProto(w, nametable) } // writeJSONProto writes a protobuf to a json payload, handling content type, marshaling, and errors func writeJSONProto(w http.ResponseWriter, obj interface{}) { w.Header().Set("Content-Type", "application/json") b, err := config.ToJSON(obj) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return } _, err = w.Write(b) if err != nil { w.WriteHeader(http.StatusInternalServerError) } } // notifyExit sends SIGTERM to itself func notifyExit() { p, err := os.FindProcess(os.Getpid()) if err != nil { log.Error(err) } if err := p.Signal(syscall.SIGTERM); err != nil { log.Errorf("failed to send SIGTERM to self: %v", err) } } // wrapIPv6 wraps the ip into "[]" in case of ipv6 func wrapIPv6(ipAddr string) string { addr := net.ParseIP(ipAddr) if addr == nil { return ipAddr } if addr.To4() != nil { return ipAddr } return fmt.Sprintf("[%s]", ipAddr) }