internal/mode/webserver/webserver.go (570 lines of code) (raw):

// Package webserver implements a web server for the GitLab Zoekt code search service. // It provides both HTTP and gRPC interfaces for searching code repositories indexed by Zoekt. // // The package handles server configuration, initialization, metrics collection, template management, // request routing, and graceful shutdown. It includes features such as: // - Command-line flag parsing for configuration // - Prometheus metrics for monitoring // - Logging with rotation // - Health monitoring via watchdog // - Disk usage monitoring // - Support for both HTML and API interfaces // - Multiplexed HTTP/gRPC handling // - Tracing via OpenTelemetry and Jaeger // - Template customization // // The server integrates with the Zoekt search engine to provide fast code search capabilities // while maintaining observability through extensive logging and metrics. package webserver import ( "context" "crypto/tls" "encoding/json" "errors" "flag" "fmt" "html/template" "io" "log" "log/slog" "net" "net/http" "net/http/httputil" "net/url" "os" "os/signal" "path/filepath" "runtime" "strconv" "strings" "sync" "time" gtlbsearch "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/search" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/shirou/gopsutil/v4/disk" sglog "github.com/sourcegraph/log" "github.com/sourcegraph/mountinfo" zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" "github.com/sourcegraph/zoekt/grpc/internalerrs" "github.com/sourcegraph/zoekt/grpc/messagesize" proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" "github.com/sourcegraph/zoekt/query" "github.com/sourcegraph/zoekt/search" "github.com/sourcegraph/zoekt/web" "github.com/uber/jaeger-client-go" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/profiler" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/trace" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" oteltrace "go.opentelemetry.io/otel/trace" "go.uber.org/automaxprocs/maxprocs" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "google.golang.org/grpc" "github.com/opentracing/opentracing-go" "github.com/sourcegraph/zoekt" "github.com/sourcegraph/zoekt/index" ) const ( serviceName = "gitlab-zoekt-webserver" logFormat = "2006-01-02T15-04-05.999999999Z07" ) var ( metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ Name: "zoekt_webserver_watchdog_errors", Help: "The current error count for zoekt watchdog.", }) metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "zoekt_webserver_watchdog_total", Help: "The total number of requests done by zoekt watchdog.", }) metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "zoekt_webserver_watchdog_errors_total", Help: "The total number of errors from zoekt watchdog.", }) metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "zoekt_search_requests_total", Help: "The total number of search requests that zoekt received", }) serverMetricsOnce sync.Once serverMetrics *grpcprom.ServerMetrics ) // Options contains the configuration for the webserver type Options struct { LogDir string LogRefresh time.Duration Listen string IndexDir string HTML bool EnableRPC bool EnableIndexserverProxy bool EnableLocalURLs bool EnablePprof bool HostCustomization string TemplateDir string PrintVersion bool // If true, only print version and exit } // ParseFlags parses command line arguments into Options func ParseFlags() (*Options, error) { logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") listen := flag.String("listen", ":6070", "listen on this address.") indexDirFlag := flag.String("index", "", "set index directory to use (DEPRECATED: use -index_dir instead)") indexDirAliasFlag := flag.String("index_dir", "", "set index directory to use") html := flag.Bool("html", true, "enable HTML interface") enableRPC := flag.Bool("rpc", true, "enable go/net RPC") enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") enableLocalURLs := flag.Bool("print", false, "enable local result URLs") enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") hostCustomization := flag.String( "host_customization", "", "specify host customization, as HOST1=QUERY,HOST2=QUERY") templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") versionFlag := flag.Bool("version", false, "Print the version and exit") secretFilePath := flag.String("secret_path", "", "gitlab shared secret file path") flag.Parse() if *versionFlag { // Version flag will be handled by the Run function return &Options{ PrintVersion: true, }, nil } if flag.NArg() == 0 && flag.NFlag() == 0 && len(os.Args) <= 2 { // If no arguments are provided, just return the help text flag.Usage() return nil, fmt.Errorf("help requested") } if *dumpTemplates { if *templateDir == "" { return nil, fmt.Errorf("must set --template_dir") } if err := writeTemplates(*templateDir); err != nil { return nil, err } // This is a special case where we want to exit early return nil, fmt.Errorf("dump_templates completed, exiting") } if *logDir != "" { if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { return nil, fmt.Errorf("%s is not a directory", *logDir) } } indexDir := *indexDirFlag if *indexDirAliasFlag != "" { indexDir = *indexDirAliasFlag } else if *indexDirFlag != "" { // Log deprecation warning only if user explicitly used -index slog.Warn("The -index flag is deprecated, please use -index_dir instead") } if err := os.MkdirAll(indexDir, 0o755); err != nil { // nolint:gosec return nil, err } if *secretFilePath != "" { slog.Info("secret_path provided. Doing nothing with it for now", "secret_path", *secretFilePath) } return &Options{ LogDir: *logDir, LogRefresh: *logRefresh, Listen: *listen, IndexDir: indexDir, HTML: *html, EnableRPC: *enableRPC, EnableIndexserverProxy: *enableIndexserverProxy, EnableLocalURLs: *enableLocalURLs, EnablePprof: *enablePprof, HostCustomization: *hostCustomization, TemplateDir: *templateDir, }, nil } // Run starts the webserver service with the provided options and version info func Run(opts *Options, version, buildTime string) error { // Handle version flag if set if opts.PrintVersion { fmt.Printf("%s %s (built at: %s)\n", os.Args[0], version, buildTime) // nolint:forbidigo return nil } if opts.LogDir != "" { go divertLogs(opts.LogDir, opts.LogRefresh) } resource := sglog.Resource{ Name: serviceName, Version: version, InstanceID: index.HostnameBestEffort(), } liblog := sglog.Init(resource) defer liblog.Sync() p := profiler.NewProfiler() p.Init(serviceName, version) // Tune GOMAXPROCS to match Linux container CPU quota. _, _ = maxprocs.Set() metricsLogger := sglog.Scoped("metricsRegistration") mustRegisterDiskMonitor(opts.IndexDir) mustRegisterMemoryMapMetrics(metricsLogger) mountOpts := mountinfo.CollectorOpts{Namespace: "gitlab_zoekt_webserver"} c := mountinfo.NewCollector(metricsLogger, mountOpts, map[string]string{"indexDir": opts.IndexDir}) prometheus.DefaultRegisterer.MustRegister(c) // Do not block on loading shards so we can become partially available // sooner. Otherwise on large instances zoekt can be unavailable on the // order of minutes. searcher, err := search.NewDirectorySearcherFast(opts.IndexDir) if err != nil { return err } searcher = &loggedSearcher{ Streamer: searcher, Logger: sglog.Scoped("searcher"), } s := &web.Server{ Searcher: searcher, Top: web.Top, Version: index.Version, } if opts.TemplateDir != "" { if templateErr := loadTemplates(s.Top, opts.TemplateDir); templateErr != nil { return fmt.Errorf("loadTemplates: %w", templateErr) } } s.Print = opts.EnableLocalURLs s.HTML = opts.HTML s.RPC = opts.EnableRPC if opts.HostCustomization != "" { s.HostCustomQueries = map[string]string{} for _, h := range strings.Split(opts.HostCustomization, ",") { if len(h) == 0 { continue } fields := strings.SplitN(h, "=", 2) if len(fields) < 2 { return fmt.Errorf("invalid host_customization %q", h) } s.HostCustomQueries[fields[0]] = fields[1] } } serveMux, err := web.NewMux(s) if err != nil { return err } gtlbSearchClient := gtlbsearch.NewSearcher(http.DefaultClient) serveMux.HandleFunc("/webserver/api/v2/search", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } result, err := gtlbSearchClient.Search(r) if err != nil { http.Error(w, fmt.Sprintf("v2 search error: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") encodingErr := json.NewEncoder(w).Encode(result) if encodingErr != nil { http.Error(w, fmt.Sprintf("v2 search encoding error: %v", encodingErr), http.StatusInternalServerError) return } }) // Register Prometheus metrics endpoint serveMux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})) if opts.EnableIndexserverProxy { socket := filepath.Join(opts.IndexDir, "indexserver.sock") sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) addProxyHandler(serveMux, socket) } handler := trace.Middleware(serveMux) // Sourcegraph: We use environment variables to configure watchdog since // they are more convenient than flags in containerized environments. watchdogTick := 30 * time.Second if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { watchdogTick, _ = time.ParseDuration(v) log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) } watchdogErrCount := 3 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { watchdogErrCount, _ = strconv.Atoi(v) log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) } watchdogAddr := "http://" + opts.Listen watchdogAddr += "/healthz" if watchdogErrCount > 0 && watchdogTick > 0 { go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) } else { log.Println("watchdog disabled") } logger := sglog.Scoped("ZoektWebserverGRPCServer") streamer := web.NewTraceAwareSearcher(s.Searcher) grpcServer := newGRPCServer(logger, streamer) handler = multiplexGRPC(grpcServer, handler) srv := &http.Server{ // nolint:gosec Addr: opts.Listen, Handler: handler, } // Start the server in a goroutine serverErrCh := make(chan error, 1) go func() { sglog.Scoped("server").Info("starting server", sglog.Stringp("address", &opts.Listen)) if err := srv.ListenAndServe(); err != http.ErrServerClosed { serverErrCh <- err } close(serverErrCh) }() // Wait for shutdown signal stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, PlatformSigterm) // Wait for either server error or shutdown signal select { case err := <-serverErrCh: if err != nil { return fmt.Errorf("server error: %w", err) } case <-stop: // Shutdown received } // Gracefully close any federated connections if err := gtlbSearchClient.Close(); err != nil { return fmt.Errorf("failed to close searcher: %w", err) } // Gracefully shutdown the server if s.RPC { return srv.Close() } // Wait for 10s to drain ongoing requests ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() log.Printf("shutting down") return srv.Shutdown(ctx) } func divertLogs(dir string, interval time.Duration) { t := time.NewTicker(interval) var last *os.File for { nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) f, err := os.Create(nm) // nolint:gosec if err != nil { // There is not much we can do now. fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) os.Exit(2) } log.SetOutput(f) if last != nil { last.Close() // nolint:errcheck,gosec } last = f <-t.C } } const templateExtension = ".html.tpl" func loadTemplates(tpl *template.Template, dir string) error { fs, err := filepath.Glob(dir + "/*" + templateExtension) if err != nil { log.Fatalf("Glob: %v", err) } log.Printf("loading templates: %v", fs) for _, fn := range fs { content, err := os.ReadFile(fn) // nolint:gosec if err != nil { return err } base := filepath.Base(fn) base = strings.TrimSuffix(base, templateExtension) if _, err := tpl.New(base).Parse(string(content)); err != nil { return fmt.Errorf("template.Parse(%s): %v", fn, err) // nolint:errorlint } } return nil } func writeTemplates(dir string) error { for k, v := range web.TemplateText { nm := filepath.Join(dir, k+templateExtension) if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { // nolint:gosec return err } } return nil } func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { defer metricWatchdogTotal.Inc() ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) defer cancel() req, err := http.NewRequest("GET", addr, nil) if err != nil { return err } req = req.WithContext(ctx) resp, err := client.Do(req) if err != nil { return err } body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) } return nil } func watchdog(dt time.Duration, maxErrCount int, addr string) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // nolint:gosec } client := &http.Client{ Transport: tr, } tick := time.NewTicker(dt) errCount := 0 for range tick.C { err := watchdogOnce(context.Background(), client, addr) if err != nil { errCount++ metricWatchdogErrors.Set(float64(errCount)) metricWatchdogErrorsTotal.Inc() if errCount >= maxErrCount { log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. Final error: %v Possible remediations: - If this rarely happens, ignore and let your process manager restart zoekt. - Possibly under provisioned. Try increasing CPU or disk IO. - A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) os.Exit(3) } else { log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) } } else if errCount > 0 { errCount = 0 metricWatchdogErrors.Set(float64(errCount)) log.Printf("watchdog: success, resetting error count") } } } func diskUsage(path string) (*disk.UsageStat, error) { duPath := path if runtime.GOOS == "windows" { duPath = filepath.VolumeName(duPath) } usage, err := disk.Usage(duPath) if err != nil { return nil, fmt.Errorf("diskUsage: %w", err) } return usage, err } func mustRegisterDiskMonitor(path string) { prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "src_disk_space_available_bytes", Help: "Amount of free space disk space.", ConstLabels: prometheus.Labels{"path": path}, }, func() float64 { // I know there is no error handling here, and I don't like it // but there was no error handling in the previous version // that used Statfs, either, so I'm assuming there's no need for it usage, _ := diskUsage(path) return float64(usage.Free) })) prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "src_disk_space_total_bytes", Help: "Amount of total disk space.", ConstLabels: prometheus.Labels{"path": path}, }, func() float64 { // I know there is no error handling here, and I don't like it // but there was no error handling in the previous version // that used Statfs, either, so I'm assuming there's no need for it usage, _ := diskUsage(path) return float64(usage.Total) })) } type loggedSearcher struct { zoekt.Streamer Logger sglog.Logger } func (s *loggedSearcher) Search( ctx context.Context, q query.Q, opts *zoekt.SearchOptions, ) (sr *zoekt.SearchResult, err error) { defer func() { var stats *zoekt.Stats if sr != nil { stats = &sr.Stats } s.log(ctx, q, opts, stats, err) }() metricSearchRequestsTotal.Inc() return s.Streamer.Search(ctx, q, opts) } func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { logger := s.Logger. WithTrace(traceContext(ctx)). With( sglog.String("query", q.String()), sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), sglog.Bool("opts.Whole", opts.Whole), sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), ) if err != nil { switch { case errors.Is(err, context.Canceled): logger.Warn("search canceled", sglog.Error(err)) case errors.Is(err, context.DeadlineExceeded): logger.Warn("search timeout", sglog.Error(err)) default: logger.Error("search failed", sglog.Error(err)) } return } if st == nil { return } logger.Debug("search", sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), sglog.Int("stat.Crashes", st.Crashes), sglog.Duration("stat.Duration", st.Duration), sglog.Int("stat.FileCount", st.FileCount), sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), sglog.Int("stat.FilesConsidered", st.FilesConsidered), sglog.Int("stat.FilesLoaded", st.FilesLoaded), sglog.Int("stat.FilesSkipped", st.FilesSkipped), sglog.Int("stat.ShardsScanned", st.ShardsScanned), sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), sglog.Int("stat.MatchCount", st.MatchCount), sglog.Int("stat.NgramMatches", st.NgramMatches), sglog.Int("stat.NgramLookups", st.NgramLookups), sglog.Duration("stat.Wait", st.Wait), sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), sglog.String("stat.FlushReason", st.FlushReason.String()), ) } func traceContext(ctx context.Context) sglog.TraceContext { otSpan := opentracing.SpanFromContext(ctx) if otSpan != nil { if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { return sglog.TraceContext{ TraceID: jaegerSpan.TraceID().String(), SpanID: jaegerSpan.SpanID().String(), } } } if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { return sglog.TraceContext{ TraceID: otelSpan.TraceID().String(), SpanID: otelSpan.SpanID().String(), } } return sglog.TraceContext{} } // multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the // request handling. Any requests that declare themselves as gRPC requests are routed // to the gRPC server, all others are routed to the httpHandler. func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { grpcServer.ServeHTTP(w, r) } else { httpHandler.ServeHTTP(w, r) } }) // Until we enable TLS, we need to fall back to the h2c protocol, which is // basically HTTP2 without TLS. The standard library does not implement the // h2s protocol, so this hijacks h2s requests and handles them correctly. return h2c.NewHandler(newHandler, &http2.Server{}) } // addProxyHandler adds a handler to "mux" that proxies all requests with base // /indexserver to "socket". func addProxyHandler(mux *http.ServeMux, socket string) { proxy := httputil.NewSingleHostReverseProxy(&url.URL{ Scheme: "http", // The value of "Host" is arbitrary, because it is ignored by the // DialContext we use for the socket connection. Host: "socket", }) proxy.Transport = &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { var d net.Dialer return d.DialContext(ctx, "unix", socket) }, } mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) } // newGRPCServer creates a new gRPC server for the webserver func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { metrics := mustGetServerMetrics() opts := []grpc.ServerOption{ grpc.ChainStreamInterceptor( otelgrpc.StreamServerInterceptor(), // nolint:staticcheck metrics.StreamServerInterceptor(), messagesize.StreamServerInterceptor, internalerrs.LoggingStreamServerInterceptor(logger), ), grpc.ChainUnaryInterceptor( otelgrpc.UnaryServerInterceptor(), // nolint:staticcheck metrics.UnaryServerInterceptor(), messagesize.UnaryServerInterceptor, internalerrs.LoggingUnaryServerInterceptor(logger), ), } opts = append(opts, additionalOpts...) // Ensure that the message size options are set last, so they override any other // server-specific options that tweak the message size. // // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they // take precedence over everything else with a uniform size setting that's easy to reason about. opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...) s := grpc.NewServer(opts...) proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer)) return s } // mustGetServerMetrics returns a singleton instance of the server metrics // that are shared across all gRPC servers that this process creates. // // This function panics if the metrics cannot be registered with the default // Prometheus registry. func mustGetServerMetrics() *grpcprom.ServerMetrics { serverMetricsOnce.Do(func() { serverMetrics = grpcprom.NewServerMetrics( grpcprom.WithServerCounterOptions(), grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request) ) prometheus.DefaultRegisterer.MustRegister(serverMetrics) }) return serverMetrics }