in internal/mode/webserver/webserver.go [202:394]
func Run(opts *Options, version, buildTime string) error {
// Handle version flag if set
if opts.PrintVersion {
fmt.Printf("%s %s (built at: %s)\n", os.Args[0], version, buildTime) // nolint:forbidigo
return nil
}
if opts.LogDir != "" {
go divertLogs(opts.LogDir, opts.LogRefresh)
}
resource := sglog.Resource{
Name: serviceName,
Version: version,
InstanceID: index.HostnameBestEffort(),
}
liblog := sglog.Init(resource)
defer liblog.Sync()
p := profiler.NewProfiler()
p.Init(serviceName, version)
// Tune GOMAXPROCS to match Linux container CPU quota.
_, _ = maxprocs.Set()
metricsLogger := sglog.Scoped("metricsRegistration")
mustRegisterDiskMonitor(opts.IndexDir)
mustRegisterMemoryMapMetrics(metricsLogger)
mountOpts := mountinfo.CollectorOpts{Namespace: "gitlab_zoekt_webserver"}
c := mountinfo.NewCollector(metricsLogger, mountOpts, map[string]string{"indexDir": opts.IndexDir})
prometheus.DefaultRegisterer.MustRegister(c)
// Do not block on loading shards so we can become partially available
// sooner. Otherwise on large instances zoekt can be unavailable on the
// order of minutes.
searcher, err := search.NewDirectorySearcherFast(opts.IndexDir)
if err != nil {
return err
}
searcher = &loggedSearcher{
Streamer: searcher,
Logger: sglog.Scoped("searcher"),
}
s := &web.Server{
Searcher: searcher,
Top: web.Top,
Version: index.Version,
}
if opts.TemplateDir != "" {
if templateErr := loadTemplates(s.Top, opts.TemplateDir); templateErr != nil {
return fmt.Errorf("loadTemplates: %w", templateErr)
}
}
s.Print = opts.EnableLocalURLs
s.HTML = opts.HTML
s.RPC = opts.EnableRPC
if opts.HostCustomization != "" {
s.HostCustomQueries = map[string]string{}
for _, h := range strings.Split(opts.HostCustomization, ",") {
if len(h) == 0 {
continue
}
fields := strings.SplitN(h, "=", 2)
if len(fields) < 2 {
return fmt.Errorf("invalid host_customization %q", h)
}
s.HostCustomQueries[fields[0]] = fields[1]
}
}
serveMux, err := web.NewMux(s)
if err != nil {
return err
}
gtlbSearchClient := gtlbsearch.NewSearcher(http.DefaultClient)
serveMux.HandleFunc("/webserver/api/v2/search", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
result, err := gtlbSearchClient.Search(r)
if err != nil {
http.Error(w, fmt.Sprintf("v2 search error: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
encodingErr := json.NewEncoder(w).Encode(result)
if encodingErr != nil {
http.Error(w, fmt.Sprintf("v2 search encoding error: %v", encodingErr), http.StatusInternalServerError)
return
}
})
// Register Prometheus metrics endpoint
serveMux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
if opts.EnableIndexserverProxy {
socket := filepath.Join(opts.IndexDir, "indexserver.sock")
sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
addProxyHandler(serveMux, socket)
}
handler := trace.Middleware(serveMux)
// Sourcegraph: We use environment variables to configure watchdog since
// they are more convenient than flags in containerized environments.
watchdogTick := 30 * time.Second
if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
watchdogTick, _ = time.ParseDuration(v)
log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
}
watchdogErrCount := 3
if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
watchdogErrCount, _ = strconv.Atoi(v)
log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
}
watchdogAddr := "http://" + opts.Listen
watchdogAddr += "/healthz"
if watchdogErrCount > 0 && watchdogTick > 0 {
go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
} else {
log.Println("watchdog disabled")
}
logger := sglog.Scoped("ZoektWebserverGRPCServer")
streamer := web.NewTraceAwareSearcher(s.Searcher)
grpcServer := newGRPCServer(logger, streamer)
handler = multiplexGRPC(grpcServer, handler)
srv := &http.Server{ // nolint:gosec
Addr: opts.Listen,
Handler: handler,
}
// Start the server in a goroutine
serverErrCh := make(chan error, 1)
go func() {
sglog.Scoped("server").Info("starting server", sglog.Stringp("address", &opts.Listen))
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
serverErrCh <- err
}
close(serverErrCh)
}()
// Wait for shutdown signal
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, PlatformSigterm)
// Wait for either server error or shutdown signal
select {
case err := <-serverErrCh:
if err != nil {
return fmt.Errorf("server error: %w", err)
}
case <-stop:
// Shutdown received
}
// Gracefully close any federated connections
if err := gtlbSearchClient.Close(); err != nil {
return fmt.Errorf("failed to close searcher: %w", err)
}
// Gracefully shutdown the server
if s.RPC {
return srv.Close()
}
// Wait for 10s to drain ongoing requests
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
log.Printf("shutting down")
return srv.Shutdown(ctx)
}