internal/indexer/indexer.go (197 lines of code) (raw):

// Package indexer provides functionality for creating and managing code search indexes // using the Zoekt search engine. It coordinates between GitLab repositories (via Gitaly) // and Zoekt indexing, supporting both full and incremental indexing strategies. // // The package handles repository content retrieval, tracks file modifications and deletions, // and manages the indexing process with error handling and retry mechanisms. It offers // performance optimization options and maintains state between the source repository // and its corresponding search index. package indexer import ( "context" "errors" "log/slog" "time" "github.com/sourcegraph/zoekt/index" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/gitaly" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/zoekt" ) const MaxFailures = 1 type Indexer struct { IndexDir string ProjectID uint32 GitalyAddress string GitalyToken string GitalyStorageName string GitalyRelativePath string gitalyClient *gitaly.GitalyClient zoektClient *zoekt.Client LimitFileSize int TargetSHA string ForceReindex bool Writer indexWriter OnRetryableFailure func(*Indexer, context.Context, error) (*IndexingResult, error) NumFailures int Initialized bool Parallelism int } type IndexingResult struct { ModifiedFilesCount uint DeletedFilesCount uint } type indexWriter interface { Write(context.Context, *IndexParams) (*IndexingResult, error) } type DefaultIndexWriter struct{} type IndexParams struct { skipIndexing bool forceReindex bool zoektBuilder *index.Builder zoektClient *zoekt.Client gitalyClient *gitaly.GitalyClient } func (i *Indexer) initClients(ctx context.Context) error { config := &gitaly.StorageConfig{ Address: i.GitalyAddress, Token: i.GitalyToken, StorageName: i.GitalyStorageName, RelativePath: i.GitalyRelativePath, } gitalyClient, gitalyErr := gitaly.NewGitalyClient(ctx, config, i.ProjectID, int64(i.LimitFileSize)) if gitalyErr != nil { return gitalyErr } i.gitalyClient = gitalyClient if i.TargetSHA == "" { targetSHA, err := i.gitalyClient.GetCurrentSHA() if err != nil { return err } i.TargetSHA = targetSHA } zoektSHA, ok, zoektErr := i.CurrentSHA() if zoektErr != nil { return zoektErr } if !ok || i.ForceReindex { i.ForceReindex = true i.gitalyClient.FromHash = "" } else if i.gitalyClient.IsValidSHA(zoektSHA) { i.gitalyClient.FromHash = zoektSHA } else { i.ForceReindex = true i.gitalyClient.FromHash = "" } i.initZoektClient(i.TargetSHA) i.gitalyClient.ToHash = i.TargetSHA return nil } func (i *Indexer) initZoektClient(targetSHA string) { branches := []zoekt.RepositoryBranch{ { Name: "HEAD", Version: targetSHA, }, } zoektOptions := &zoekt.Options{ IndexDir: i.IndexDir, ID: i.ProjectID, IsDelta: !i.ForceReindex, SizeMax: i.LimitFileSize, Branches: branches, Parallelism: i.Parallelism, } i.zoektClient = zoekt.NewZoektClient(zoektOptions) } func (i *Indexer) CurrentSHA() (string, bool, error) { i.initZoektClient("") zoektSHA, ok, err := i.zoektClient.GetCurrentSHA() if err != nil { return "", ok, err } return zoektSHA, ok, nil } func (i *Indexer) IndexRepository(ctx context.Context) (*IndexingResult, error) { if i.Writer == nil { return nil, errors.New("indexing writer cannot be nil") } slog.Info("start IndexRepository", "project_id", i.ProjectID, "force", i.ForceReindex) startTime := time.Now() indexingResult, err := i.index(ctx) if err != nil { if i.Initialized && i.NumFailures < MaxFailures { i.Initialized = false i.NumFailures += 1 return i.OnRetryableFailure(i, ctx, err) } else { return nil, err } } slog.Info("finish IndexRepository", "project_id", i.ProjectID, "force", i.ForceReindex, "indexTime", time.Since(startTime).Seconds()) return indexingResult, nil } func (i *Indexer) index(ctx context.Context) (*IndexingResult, error) { if err := ctx.Err(); err != nil { return nil, err } if err := i.initClients(ctx); err != nil { return nil, err } defer i.gitalyClient.Close() i.Initialized = true skipIndexing := i.zoektClient.IncrementalSkipIndexing() if skipIndexing && !i.ForceReindex { return nil, nil } if i.TargetSHA == "" { slog.Info("skip IndexRepository because TargetSHA is empty", "project_id", i.ProjectID, "force", i.ForceReindex) return nil, nil } builder, builderErr := i.zoektClient.NewBuilder() if builderErr != nil { return nil, builderErr } return i.Writer.Write(ctx, &IndexParams{ skipIndexing: skipIndexing, forceReindex: i.ForceReindex, zoektBuilder: builder, zoektClient: i.zoektClient, gitalyClient: i.gitalyClient, }) } func IndexingFailureFallback(ix *Indexer, ctx context.Context, err error) (*IndexingResult, error) { if ctx.Err() != nil { // assume error occurred because of context cancellation return nil, err } if !ix.ForceReindex { slog.Info("attempting to force reindex due to incremental indexing error", "error", err) ix.ForceReindex = true return ix.IndexRepository(ctx) } return nil, err } func (w *DefaultIndexWriter) Write(ctx context.Context, p *IndexParams) (*IndexingResult, error) { var modifiedFilesCount, deletedFilesCount uint putFunc := func(file *gitaly.File) error { if err := ctx.Err(); err != nil { return err } p.zoektBuilder.MarkFileAsChangedOrRemoved(file.Path) if e := p.zoektClient.AddFile(p.zoektBuilder, file.Path, file.Content, file.Size, file.TooLarge, []string{"HEAD"}); e != nil { return e } modifiedFilesCount++ return nil } delFunc := func(path string) error { if err := ctx.Err(); err != nil { return err } p.zoektBuilder.MarkFileAsChangedOrRemoved(path) deletedFilesCount++ return nil } if err := p.gitalyClient.EachFileChange(putFunc, delFunc); err != nil { //nolint:contextcheck return nil, err } if err := p.zoektBuilder.Finish(); err != nil { return nil, err } indexingResult := &IndexingResult{ ModifiedFilesCount: modifiedFilesCount, DeletedFilesCount: deletedFilesCount, } return indexingResult, nil }