pkg/server/server.go (162 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "context" "encoding/json" "fmt" "net/http" "sync" "time" "github.com/GoogleCloudPlatform/gke-prober/pkg/common" "github.com/GoogleCloudPlatform/gke-prober/pkg/k8s" "github.com/GoogleCloudPlatform/gke-prober/pkg/metrics" "github.com/GoogleCloudPlatform/gke-prober/pkg/probe" "github.com/GoogleCloudPlatform/gke-prober/pkg/scheduler" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/klog/v2" ) // A HTTP server that hanldes Readiness and Liveness probe // You choose between this built-in server or your own custom server type Server struct { httpserver *http.Server clientSet *kubernetes.Clientset provider metrics.Provider cw k8s.ClusterWatcher cr metrics.ClusterRecorder pr metrics.ProbeRecorder nw k8s.NodeWatcher nr metrics.NodeRecorder Config common.Config // tickStatusMux protects tick fields tickStatusMux sync.RWMutex // tickLastStart is equal to start time of last unfinished tick tickLastStart time.Time } // New return a http server instnace func NewServer(lchecks *Checks, rchecks *Checks) *Server { s := &Server{ Config: common.GetConfig(), } mux := http.NewServeMux() // Register probers (liveness and Readiness) // The default liveness probe is tickSerivce unless specified if lchecks != nil { mux.Handle("/liveness", lchecks.Healthz()) } else { mux.Handle("/liveness", s.Healthz()) } s.httpserver = &http.Server{ Addr: ":8080", Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second, } return s } func (s *Server) RunUntil(ctx context.Context, kubeconfig string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() go s.graceshutdown(ctx) klog.Infof("starting the server with config: %+v\n", s.Config) var err error s.clientSet = k8s.ClientOrDie(kubeconfig) s.provider, err = metrics.StartGCM(ctx, s.Config) if err != nil { panic(err.Error()) } if s.Config.Mode == common.ModeCluster { s.cr = s.provider.ClusterRecorder() s.cw = k8s.NewClusterWatcher(s.clientSet) // start informers s.cw.StartClusterWatches(ctx.Done()) if s.Config.ClusterProbes { s.pr = s.provider.ProbeRecorder() } go s.runScrape(ctx) } else { s.nr = s.provider.NodeRecorder() s.nw = k8s.NewNodeWatcher(s.clientSet, s.Config.NodeName) s.nw.StartNodeWatches(ctx.Done()) go s.runScrape(ctx) } klog.V(1).Infof("[gke-prober Livenes and Readiness Server]: Lisenting on %s \n", s.httpserver.Addr) return s.httpserver.ListenAndServe() } func (s *Server) runScrape(ctx context.Context) { ticker := time.NewTicker(s.Config.ReportInterval) defer ticker.Stop() s.tick(ctx, time.Now()) for { select { case startTime := <-ticker.C: s.tick(ctx, startTime) case <-ctx.Done(): return } } } // Every tick trigges one metrics scraping cycle // Scrping should complete within the interval, or time out func (s *Server) tick(ctx context.Context, startTime time.Time) { s.tickStatusMux.Lock() s.tickLastStart = startTime s.tickStatusMux.Unlock() ctx, cancelTimeout := context.WithTimeout(ctx, s.Config.ReportInterval) defer cancelTimeout() klog.V(1).Infof("[%s] Scraping metrics starts\n", startTime.Format(time.RFC3339)) if common.ModeCluster == s.Config.Mode { scheduler.RecordClusterMetrics(ctx, s.cr, s.cw.GetNodes(), s.cw.GetDaemonSets(), s.cw.GetDeployments()) if s.Config.ClusterProbes { scheduler.RecordClusterProbeMetrics(ctx, s.clientSet, s.pr, probe.ClusterProbes()) } } else { scheduler.RecordNodeMetrics(ctx, s.nr, s.Config, s.nw.GetNodes(), s.nw.GetPods(), probe.NodeProbes()) } collectTime := time.Since(startTime) klog.V(1).Infof("[%s] Scraping cycle ends. Duration(seconds): %f\n", time.Now().Format(time.RFC3339), float64(collectTime)/float64(time.Second)) } func (s *Server) graceshutdown(ctx context.Context) { <-ctx.Done() newctx, cancelTimeout := context.WithTimeout(context.Background(), 30*time.Second) defer cancelTimeout() var err error if err = s.provider.Close(); err != nil { klog.Warningf("Cannot close the connection to provider's API services due to [%s]", err.Error()) } s.httpserver.SetKeepAlivesEnabled(false) if err = s.httpserver.Shutdown(newctx); err != nil { klog.Warningf("Health Check Server failed to shutdown due to [%s]", err.Error()) } } func (s *Server) Healthz() http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") errs := []Error{} svcs := []Service{} svcs = append(svcs, Service{ Name: "default liveness check", Healthy: true, }) s.tickStatusMux.RLock() lastTickStart := s.tickLastStart s.tickStatusMux.RUnlock() maxTickWait := time.Duration(1.5 * float64(s.Config.ReportInterval)) tickWait := time.Since(lastTickStart) klog.V(2).Infof("Liveness check by kubelet, tickWait is %s\n", tickWait.String()) if !lastTickStart.IsZero() && tickWait > maxTickWait { klog.Warningf("Failed default Liveness probe, [tickWait]:%d exceeds [maxTickWait]:%d", tickWait, maxTickWait) err := fmt.Sprint("Tick not finished on time") errs = append(errs, Error{ Name: "Default liveness probe", Message: err, }) } response := Response{ Service: svcs, Errors: errs, Healthy: true, } if len(errs) > 0 { response.Healthy = false w.WriteHeader(http.StatusServiceUnavailable) } else { w.WriteHeader(http.StatusOK) } json, err := json.Marshal(response) if err != nil { klog.Warning(err.Error()) } w.Write(json) }) }