internal/server/server.go (293 lines of code) (raw):

// Package server implements a complete HTTP API server for the GitLab Zoekt indexer service. // It provides endpoints for managing code search indexes, including creation, deletion, // status checking, and maintenance operations. // // The package consists of several components: // - IndexServer: The main server that handles HTTP requests and manages the indexing process // - Router: Configures routes and middleware for the HTTP API // - DefaultIndexBuilder: Implements repository indexing and deletion operations // - Request/response types: Defines the data structures for API communication // // The server handles concurrent indexing requests with locking mechanisms, tracks metrics // via Prometheus, implements callbacks to report operation results back to GitLab, and // provides standardized JSON responses for all API operations. It integrates with Gitaly // for repository access and manages the lifecycle of search indexes. package server import ( "context" "encoding/json" "errors" "fmt" "log/slog" "net/http" "os" "strconv" "time" "github.com/go-chi/chi/v5" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/callback" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/debug_ls" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/file_cleaner" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/indexer" ) const ( concurrencyLimitEnv = "ZOEKT_INDEXER_CONCURRENCY_LIMIT" cleanupInterval = 5 * time.Minute ) func (s *IndexServer) StartIndexingAPI(httpServer *http.Server) error { s.initMetrics() if err := s.createIndexDir(); err != nil { return err } slog.Info("starting server", "address", httpServer.Addr, "path_prefix", s.PathPrefix) if err := httpServer.ListenAndServe(); err != nil { return err } return nil } func (s *IndexServer) StartFileCleaner(ctx context.Context) error { tmpCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock) err := tmpCleaner.Init() if err != nil { return err } return tmpCleaner.StartCleanInterval(ctx, cleanupInterval) } func (s *IndexServer) createIndexDir() error { return CreateIndexDir(s.IndexBuilder.GetIndexDir()) } func CreateIndexDir(d string) error { if err := os.MkdirAll(d, 0o755); err != nil { //nolint:gosec return fmt.Errorf("createIndexDir %s: %w", d, err) } return nil } func (s *IndexServer) handleStatus() http.HandlerFunc { route := "status" type response struct { Success bool SHA string } return func(w http.ResponseWriter, r *http.Request) { param := chi.URLParam(r, "id") repoID, err := strconv.ParseUint(param, 10, 32) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } idx := &indexer.Indexer{ IndexDir: s.IndexBuilder.GetIndexDir(), ProjectID: uint32(repoID), } currentSHA, ok, err := idx.CurrentSHA() if err != nil { s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } if !ok { s.respondWithStatus(w, r, route, http.StatusNotFound) return } resp := response{ Success: true, SHA: currentSHA, } s.respondWith(w, r, route, resp) } } func (s *IndexServer) handleDelete() http.HandlerFunc { route := "delete" return func(w http.ResponseWriter, r *http.Request) { param := chi.URLParam(r, "id") rID, err := strconv.ParseUint(param, 10, 32) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } repoID := uint32(rID) if !s.IndexingLock.TryLock(repoID) { s.respondWithError(w, r, route, errors.New("indexing is in progress"), http.StatusLocked) return } defer s.IndexingLock.Unlock(repoID) deleteRequest := DeleteRequest{ RepoID: repoID, Callback: &callback.CallbackParams{ Name: "delete", RailsPayload: map[string]uint32{"RepoId": repoID}, }, } err = s.IndexBuilder.DeleteRepository( deleteRequest, callback.CallbackFunc{ OnSuccess: func(params callback.CallbackParams) { //nolint:contextcheck s.CallbackAPI.SendSuccess(r.Context(), params, s.IndexBuilder.GetIndexDir(), deleteRequest.RepoID) }, OnFailure: func(params callback.CallbackParams, errorReason error) { //nolint:contextcheck s.CallbackAPI.SendFailure(r.Context(), params, s.IndexBuilder.GetIndexDir(), deleteRequest.RepoID, errorReason) }, }, s.IndexingLock, ) if err != nil { err = fmt.Errorf("failed to remove shards for repoID: %d from %v with error: %w", repoID, s.IndexBuilder.GetIndexDir(), err) s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } resp := struct { Success bool }{ Success: true, } s.respondWith(w, r, route, resp) } } func (s *IndexServer) handleMetrics() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r) } } func (s *IndexServer) handleDebugLs() http.HandlerFunc { route := "debug/ls" return func(w http.ResponseWriter, r *http.Request) { response, err := debug_ls.ListFiles(s.IndexBuilder.GetIndexDir()) if err != nil { s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } s.respondWith(w, r, route, response) } } func (s *IndexServer) decode(r *http.Request, v interface{}) error { dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() return dec.Decode(v) } func (s *IndexServer) overIndexingLimit() bool { val, ok := os.LookupEnv(concurrencyLimitEnv) if !ok { return false } limit, err := strconv.ParseInt(val, 10, 32) if err != nil { return false } return s.IndexingLock.InProgressCount() >= int(limit) } func (s *IndexServer) handleIndex() http.HandlerFunc { route := "index" type response struct { Success bool } parseRequest := func(r *http.Request) (IndexRequest, error) { var req IndexRequest err := s.decode(r, &req) if err != nil { return req, errors.New("json parser error") } return req, nil } return func(w http.ResponseWriter, r *http.Request) { req, err := parseRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if s.overIndexingLimit() { http.Error(w, "over concurrency limit", http.StatusTooManyRequests) return } if !s.IndexingLock.TryLock(req.RepoID) { s.respondWithError(w, r, route, errors.New("indexing is already in progress"), http.StatusLocked) return } defer s.IndexingLock.Unlock(req.RepoID) err = s.IndexBuilder.IndexRepository( r.Context(), req, callback.CallbackFunc{ OnSuccess: func(params callback.CallbackParams) { //nolint:contextcheck s.CallbackAPI.SendSuccess(r.Context(), params, s.IndexBuilder.GetIndexDir(), req.RepoID) }, OnFailure: func(params callback.CallbackParams, errorReason error) { //nolint:contextcheck s.CallbackAPI.SendFailure(r.Context(), params, s.IndexBuilder.GetIndexDir(), req.RepoID, errorReason) }, }, ) if err != nil { s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } resp := response{ Success: true, } s.respondWith(w, r, route, resp) } } func (s *IndexServer) handleTruncate() http.HandlerFunc { route := "truncate" type response struct { Success bool } return func(w http.ResponseWriter, r *http.Request) { if err := s.IndexingLock.LockAll(); err != nil { s.respondWithError(w, r, route, err, http.StatusLocked) return } defer s.IndexingLock.UnlockAll() fc := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock) if err := fc.Truncate(); err != nil { err = fmt.Errorf("failed to empty indexDir: %v with error: %w", s.IndexBuilder.GetIndexDir(), err) s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } resp := response{ Success: true, } s.respondWith(w, r, route, resp) } } func (s *IndexServer) respondWith(w http.ResponseWriter, r *http.Request, route string, data interface{}) { w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(data); err != nil { s.respondWithError(w, r, route, err, http.StatusInternalServerError) return } s.incrementRequestsTotal(r.Method, route, http.StatusOK) } func (s *IndexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error, responseCode int) { type response struct { Success bool `json:"success"` Error string `json:"error"` } if errors.Is(err, context.DeadlineExceeded) { responseCode = http.StatusGatewayTimeout } w.Header().Set("Content-Type", "application/json") w.WriteHeader(responseCode) resp := response{ Success: false, Error: err.Error(), } _ = json.NewEncoder(w).Encode(resp) //nolint:errchkjson s.incrementRequestsTotal(r.Method, route, responseCode) } func (s *IndexServer) respondWithStatus(w http.ResponseWriter, r *http.Request, route string, responseCode int) { w.WriteHeader(responseCode) s.incrementRequestsTotal(r.Method, route, responseCode) } func (s *IndexServer) incrementRequestsTotal(method, route string, responseCode int) { s.metricsRequestsTotal.With(prometheus.Labels{"code": strconv.Itoa(responseCode), "method": method, "route": route}).Inc() } func (s *IndexServer) initMetrics() { s.promRegistry = prometheus.NewRegistry() // Add go runtime metrics and process collectors. s.promRegistry.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) s.metricsRequestsTotal = promauto.With(s.promRegistry).NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_zoekt_indexer_requests_total", Help: "Total number of HTTP requests by status code, method, and route.", }, []string{"method", "route", "code"}, ) promauto.With(s.promRegistry).NewGaugeFunc( prometheus.GaugeOpts{ Name: "gitlab_zoekt_indexing_locks", Help: "Number of indexing locks currently in progress", }, func() float64 { return float64(s.IndexingLock.InProgressCount()) }, ) }