internal/server/server.go (293 lines of code) (raw):
// Package server implements a complete HTTP API server for the GitLab Zoekt indexer service.
// It provides endpoints for managing code search indexes, including creation, deletion,
// status checking, and maintenance operations.
//
// The package consists of several components:
// - IndexServer: The main server that handles HTTP requests and manages the indexing process
// - Router: Configures routes and middleware for the HTTP API
// - DefaultIndexBuilder: Implements repository indexing and deletion operations
// - Request/response types: Defines the data structures for API communication
//
// The server handles concurrent indexing requests with locking mechanisms, tracks metrics
// via Prometheus, implements callbacks to report operation results back to GitLab, and
// provides standardized JSON responses for all API operations. It integrates with Gitaly
// for repository access and manages the lifecycle of search indexes.
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/callback"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/debug_ls"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/file_cleaner"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/indexer"
)
const (
concurrencyLimitEnv = "ZOEKT_INDEXER_CONCURRENCY_LIMIT"
cleanupInterval = 5 * time.Minute
)
func (s *IndexServer) StartIndexingAPI(httpServer *http.Server) error {
s.initMetrics()
if err := s.createIndexDir(); err != nil {
return err
}
slog.Info("starting server", "address", httpServer.Addr, "path_prefix", s.PathPrefix)
if err := httpServer.ListenAndServe(); err != nil {
return err
}
return nil
}
func (s *IndexServer) StartFileCleaner(ctx context.Context) error {
tmpCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock)
err := tmpCleaner.Init()
if err != nil {
return err
}
return tmpCleaner.StartCleanInterval(ctx, cleanupInterval)
}
func (s *IndexServer) createIndexDir() error {
return CreateIndexDir(s.IndexBuilder.GetIndexDir())
}
func CreateIndexDir(d string) error {
if err := os.MkdirAll(d, 0o755); err != nil { //nolint:gosec
return fmt.Errorf("createIndexDir %s: %w", d, err)
}
return nil
}
func (s *IndexServer) handleStatus() http.HandlerFunc {
route := "status"
type response struct {
Success bool
SHA string
}
return func(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
repoID, err := strconv.ParseUint(param, 10, 32)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
idx := &indexer.Indexer{
IndexDir: s.IndexBuilder.GetIndexDir(),
ProjectID: uint32(repoID),
}
currentSHA, ok, err := idx.CurrentSHA()
if err != nil {
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
if !ok {
s.respondWithStatus(w, r, route, http.StatusNotFound)
return
}
resp := response{
Success: true,
SHA: currentSHA,
}
s.respondWith(w, r, route, resp)
}
}
func (s *IndexServer) handleDelete() http.HandlerFunc {
route := "delete"
return func(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
rID, err := strconv.ParseUint(param, 10, 32)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
repoID := uint32(rID)
if !s.IndexingLock.TryLock(repoID) {
s.respondWithError(w, r, route, errors.New("indexing is in progress"), http.StatusLocked)
return
}
defer s.IndexingLock.Unlock(repoID)
deleteRequest := DeleteRequest{
RepoID: repoID,
Callback: &callback.CallbackParams{
Name: "delete",
RailsPayload: map[string]uint32{"RepoId": repoID},
},
}
err = s.IndexBuilder.DeleteRepository(
deleteRequest,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) { //nolint:contextcheck
s.CallbackAPI.SendSuccess(r.Context(), params, s.IndexBuilder.GetIndexDir(), deleteRequest.RepoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) { //nolint:contextcheck
s.CallbackAPI.SendFailure(r.Context(), params, s.IndexBuilder.GetIndexDir(), deleteRequest.RepoID, errorReason)
},
},
s.IndexingLock,
)
if err != nil {
err = fmt.Errorf("failed to remove shards for repoID: %d from %v with error: %w", repoID, s.IndexBuilder.GetIndexDir(), err)
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
resp := struct {
Success bool
}{
Success: true,
}
s.respondWith(w, r, route, resp)
}
}
func (s *IndexServer) handleMetrics() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r)
}
}
func (s *IndexServer) handleDebugLs() http.HandlerFunc {
route := "debug/ls"
return func(w http.ResponseWriter, r *http.Request) {
response, err := debug_ls.ListFiles(s.IndexBuilder.GetIndexDir())
if err != nil {
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
s.respondWith(w, r, route, response)
}
}
func (s *IndexServer) decode(r *http.Request, v interface{}) error {
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
return dec.Decode(v)
}
func (s *IndexServer) overIndexingLimit() bool {
val, ok := os.LookupEnv(concurrencyLimitEnv)
if !ok {
return false
}
limit, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return false
}
return s.IndexingLock.InProgressCount() >= int(limit)
}
func (s *IndexServer) handleIndex() http.HandlerFunc {
route := "index"
type response struct {
Success bool
}
parseRequest := func(r *http.Request) (IndexRequest, error) {
var req IndexRequest
err := s.decode(r, &req)
if err != nil {
return req, errors.New("json parser error")
}
return req, nil
}
return func(w http.ResponseWriter, r *http.Request) {
req, err := parseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if s.overIndexingLimit() {
http.Error(w, "over concurrency limit", http.StatusTooManyRequests)
return
}
if !s.IndexingLock.TryLock(req.RepoID) {
s.respondWithError(w, r, route, errors.New("indexing is already in progress"), http.StatusLocked)
return
}
defer s.IndexingLock.Unlock(req.RepoID)
err = s.IndexBuilder.IndexRepository(
r.Context(),
req,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) { //nolint:contextcheck
s.CallbackAPI.SendSuccess(r.Context(), params, s.IndexBuilder.GetIndexDir(), req.RepoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) { //nolint:contextcheck
s.CallbackAPI.SendFailure(r.Context(), params, s.IndexBuilder.GetIndexDir(), req.RepoID, errorReason)
},
},
)
if err != nil {
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
resp := response{
Success: true,
}
s.respondWith(w, r, route, resp)
}
}
func (s *IndexServer) handleTruncate() http.HandlerFunc {
route := "truncate"
type response struct {
Success bool
}
return func(w http.ResponseWriter, r *http.Request) {
if err := s.IndexingLock.LockAll(); err != nil {
s.respondWithError(w, r, route, err, http.StatusLocked)
return
}
defer s.IndexingLock.UnlockAll()
fc := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock)
if err := fc.Truncate(); err != nil {
err = fmt.Errorf("failed to empty indexDir: %v with error: %w", s.IndexBuilder.GetIndexDir(), err)
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
resp := response{
Success: true,
}
s.respondWith(w, r, route, resp)
}
}
func (s *IndexServer) respondWith(w http.ResponseWriter, r *http.Request, route string, data interface{}) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
s.respondWithError(w, r, route, err, http.StatusInternalServerError)
return
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
}
func (s *IndexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error, responseCode int) {
type response struct {
Success bool `json:"success"`
Error string `json:"error"`
}
if errors.Is(err, context.DeadlineExceeded) {
responseCode = http.StatusGatewayTimeout
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(responseCode)
resp := response{
Success: false,
Error: err.Error(),
}
_ = json.NewEncoder(w).Encode(resp) //nolint:errchkjson
s.incrementRequestsTotal(r.Method, route, responseCode)
}
func (s *IndexServer) respondWithStatus(w http.ResponseWriter, r *http.Request, route string, responseCode int) {
w.WriteHeader(responseCode)
s.incrementRequestsTotal(r.Method, route, responseCode)
}
func (s *IndexServer) incrementRequestsTotal(method, route string, responseCode int) {
s.metricsRequestsTotal.With(prometheus.Labels{"code": strconv.Itoa(responseCode), "method": method, "route": route}).Inc()
}
func (s *IndexServer) initMetrics() {
s.promRegistry = prometheus.NewRegistry()
// Add go runtime metrics and process collectors.
s.promRegistry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
s.metricsRequestsTotal = promauto.With(s.promRegistry).NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_zoekt_indexer_requests_total",
Help: "Total number of HTTP requests by status code, method, and route.",
},
[]string{"method", "route", "code"},
)
promauto.With(s.promRegistry).NewGaugeFunc(
prometheus.GaugeOpts{
Name: "gitlab_zoekt_indexing_locks",
Help: "Number of indexing locks currently in progress",
},
func() float64 {
return float64(s.IndexingLock.InProgressCount())
},
)
}