internal/mode/indexer/indexer.go (168 lines of code) (raw):

// Package indexer provides the main entry point and orchestration for the GitLab Zoekt indexer service. // It handles command-line argument parsing, service initialization, and coordinates the various // components needed for code search indexing. // // The package implements: // - Command-line flag parsing and configuration management // - Service lifecycle (startup, shutdown, signal handling) // - Integration with GitLab for task requests and callbacks // - HTTP server initialization for the indexing API // - Background tasks for file cleaning and maintenance // // The indexer service connects to GitLab, processes indexing requests, and maintains // search indexes that enable fast code search functionality. package indexer import ( "context" "flag" "fmt" "log/slog" "net/http" "os" "os/signal" "runtime" "syscall" "time" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/callback" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/indexing_lock" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/middleware_logger" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/node_uuid" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/profiler" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/secretreader" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/server" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/task_request" "go.uber.org/automaxprocs/maxprocs" "golang.org/x/sync/errgroup" ) const ( serviceName = "gitlab-zoekt-indexer" serverShutdownDelay = 10 * time.Minute ) // Options contains the configuration for the indexer type Options struct { IndexDir string PathPrefix string Listen string NodeName string NodeUUID string SelfURL string SearchURL string GitlabURL string SecretFilePath string PrintVersion bool // If true, only print version and exit } // ParseFlags parses command line arguments into Options func ParseFlags() (*Options, error) { indexDir := flag.String("index_dir", "", "directory holding index shards.") pathPrefix := flag.String("path_prefix", "/indexer", "prefix for the routes") nodeNameFlag := flag.String("node_name", "", "name of the node") selfURL := flag.String("self_url", "", "the URL to reach the node") searchURL := flag.String("search_url", "", "the URL to reach the webserver if it differs from self_url") gitlabURL := flag.String("gitlab_url", "", "gitlab URL") secretFilePath := flag.String("secret_path", "", "gitlab shared secret file path") listen := flag.String("listen", ":6060", "listen on this address.") versionFlag := flag.Bool("version", false, "Print the version and exit") flag.Parse() if *versionFlag { // Version flag will be handled by the Run function return &Options{ PrintVersion: true, }, nil } if *indexDir == "" { if flag.NArg() == 0 && flag.NFlag() == 0 && len(os.Args) <= 2 { // If no arguments are provided, just return the help text flag.Usage() return nil, fmt.Errorf("must set -index_dir") } err := fmt.Errorf("must set -index_dir") return nil, err } if err := server.CreateIndexDir(*indexDir); err != nil { return nil, err } nodeName := *nodeNameFlag if nodeName == "" { hostName, err := os.Hostname() if err != nil { return nil, err } nodeName = hostName } n := node_uuid.NewNodeUUID(*indexDir) nodeUUID, err := n.Get() if err != nil { return nil, err } if *searchURL == "" { *searchURL = *selfURL } return &Options{ IndexDir: *indexDir, PathPrefix: *pathPrefix, Listen: *listen, NodeName: nodeName, NodeUUID: nodeUUID, SelfURL: *selfURL, SearchURL: *searchURL, GitlabURL: *gitlabURL, SecretFilePath: *secretFilePath, }, nil } // Run starts the indexer service with the provided options and version info func Run(opts *Options, version, buildTime string) error { // Handle version flag if set if opts.PrintVersion { fmt.Printf("%s %s (built at: %s)\n", os.Args[0], version, buildTime) // nolint:forbidigo return nil } middleware_logger.SetUpLogger() var callbackAPIInstance callback.CallbackAPI // Tune GOMAXPROCS to match Linux container CPU quota. _, _ = maxprocs.Set() indexingLock := indexing_lock.NewIndexingLock() ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() g, ctx := errgroup.WithContext(ctx) s := &server.IndexServer{ PathPrefix: opts.PathPrefix, IndexBuilder: server.DefaultIndexBuilder{ IndexDir: opts.IndexDir, }, IndexingLock: indexingLock, } if opts.GitlabURL != "" && opts.SelfURL != "" { secret, err := secretreader.ReadSecret(opts.SecretFilePath) if err != nil { return fmt.Errorf("could not read secret from the file_path: %s %w", opts.SecretFilePath, err) } callbackAPIInstance, err = callback.NewCallbackAPI(opts.GitlabURL, opts.NodeUUID, secret, &http.Client{}) if err != nil { return err } s.CallbackAPI = callbackAPIInstance concurrency := runtime.GOMAXPROCS(0) slog.Info("starting taskRequest", "node_name", opts.NodeName, "index_url", opts.SelfURL, "search_url", opts.SelfURL, "gitlab_url", opts.GitlabURL, "concurrency", concurrency) taskRequest, err := task_request.NewTaskRequestTimer(&task_request.NewTaskRequestTimerParams{ IndexDir: opts.IndexDir, NodeName: opts.NodeName, NodeUUID: opts.NodeUUID, Version: version, SelfURL: opts.SelfURL, SearchURL: opts.SearchURL, GitlabURL: opts.GitlabURL, Secret: secret, Concurrency: concurrency, IndexingLock: indexingLock, }) if err != nil { return err } g.Go(func() error { return taskRequest.Start(ctx, s) }) } p := profiler.NewProfiler() p.Init(serviceName, version) httpServer := &http.Server{ //nolint:gosec Addr: opts.Listen, Handler: s.Router(), } g.Go(func() error { if err := s.StartIndexingAPI(httpServer); err != nil && err != http.ErrServerClosed { //nolint:errorlint return fmt.Errorf("failed to start indexing API: %w", err) } return nil }) g.Go(func() error { return s.StartFileCleaner(ctx) }) <-ctx.Done() slog.Info("gracefully shutting down...") g.Go(func() error { timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), serverShutdownDelay) defer timeoutCancel() return httpServer.Shutdown(timeoutCtx) }) return g.Wait() }