internal/indexer/indexer.go (197 lines of code) (raw):
// Package indexer provides functionality for creating and managing code search indexes
// using the Zoekt search engine. It coordinates between GitLab repositories (via Gitaly)
// and Zoekt indexing, supporting both full and incremental indexing strategies.
//
// The package handles repository content retrieval, tracks file modifications and deletions,
// and manages the indexing process with error handling and retry mechanisms. It offers
// performance optimization options and maintains state between the source repository
// and its corresponding search index.
package indexer
import (
"context"
"errors"
"log/slog"
"time"
"github.com/sourcegraph/zoekt/index"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/zoekt"
)
const MaxFailures = 1
type Indexer struct {
IndexDir string
ProjectID uint32
GitalyAddress string
GitalyToken string
GitalyStorageName string
GitalyRelativePath string
gitalyClient *gitaly.GitalyClient
zoektClient *zoekt.Client
LimitFileSize int
TargetSHA string
ForceReindex bool
Writer indexWriter
OnRetryableFailure func(*Indexer, context.Context, error) (*IndexingResult, error)
NumFailures int
Initialized bool
Parallelism int
}
type IndexingResult struct {
ModifiedFilesCount uint
DeletedFilesCount uint
}
type indexWriter interface {
Write(context.Context, *IndexParams) (*IndexingResult, error)
}
type DefaultIndexWriter struct{}
type IndexParams struct {
skipIndexing bool
forceReindex bool
zoektBuilder *index.Builder
zoektClient *zoekt.Client
gitalyClient *gitaly.GitalyClient
}
func (i *Indexer) initClients(ctx context.Context) error {
config := &gitaly.StorageConfig{
Address: i.GitalyAddress,
Token: i.GitalyToken,
StorageName: i.GitalyStorageName,
RelativePath: i.GitalyRelativePath,
}
gitalyClient, gitalyErr := gitaly.NewGitalyClient(ctx, config, i.ProjectID, int64(i.LimitFileSize))
if gitalyErr != nil {
return gitalyErr
}
i.gitalyClient = gitalyClient
if i.TargetSHA == "" {
targetSHA, err := i.gitalyClient.GetCurrentSHA()
if err != nil {
return err
}
i.TargetSHA = targetSHA
}
zoektSHA, ok, zoektErr := i.CurrentSHA()
if zoektErr != nil {
return zoektErr
}
if !ok || i.ForceReindex {
i.ForceReindex = true
i.gitalyClient.FromHash = ""
} else if i.gitalyClient.IsValidSHA(zoektSHA) {
i.gitalyClient.FromHash = zoektSHA
} else {
i.ForceReindex = true
i.gitalyClient.FromHash = ""
}
i.initZoektClient(i.TargetSHA)
i.gitalyClient.ToHash = i.TargetSHA
return nil
}
func (i *Indexer) initZoektClient(targetSHA string) {
branches := []zoekt.RepositoryBranch{
{
Name: "HEAD",
Version: targetSHA,
},
}
zoektOptions := &zoekt.Options{
IndexDir: i.IndexDir,
ID: i.ProjectID,
IsDelta: !i.ForceReindex,
SizeMax: i.LimitFileSize,
Branches: branches,
Parallelism: i.Parallelism,
}
i.zoektClient = zoekt.NewZoektClient(zoektOptions)
}
func (i *Indexer) CurrentSHA() (string, bool, error) {
i.initZoektClient("")
zoektSHA, ok, err := i.zoektClient.GetCurrentSHA()
if err != nil {
return "", ok, err
}
return zoektSHA, ok, nil
}
func (i *Indexer) IndexRepository(ctx context.Context) (*IndexingResult, error) {
if i.Writer == nil {
return nil, errors.New("indexing writer cannot be nil")
}
slog.Info("start IndexRepository", "project_id", i.ProjectID, "force", i.ForceReindex)
startTime := time.Now()
indexingResult, err := i.index(ctx)
if err != nil {
if i.Initialized && i.NumFailures < MaxFailures {
i.Initialized = false
i.NumFailures += 1
return i.OnRetryableFailure(i, ctx, err)
} else {
return nil, err
}
}
slog.Info("finish IndexRepository", "project_id", i.ProjectID, "force", i.ForceReindex, "indexTime", time.Since(startTime).Seconds())
return indexingResult, nil
}
func (i *Indexer) index(ctx context.Context) (*IndexingResult, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
if err := i.initClients(ctx); err != nil {
return nil, err
}
defer i.gitalyClient.Close()
i.Initialized = true
skipIndexing := i.zoektClient.IncrementalSkipIndexing()
if skipIndexing && !i.ForceReindex {
return nil, nil
}
if i.TargetSHA == "" {
slog.Info("skip IndexRepository because TargetSHA is empty", "project_id", i.ProjectID, "force", i.ForceReindex)
return nil, nil
}
builder, builderErr := i.zoektClient.NewBuilder()
if builderErr != nil {
return nil, builderErr
}
return i.Writer.Write(ctx, &IndexParams{
skipIndexing: skipIndexing,
forceReindex: i.ForceReindex,
zoektBuilder: builder,
zoektClient: i.zoektClient,
gitalyClient: i.gitalyClient,
})
}
func IndexingFailureFallback(ix *Indexer, ctx context.Context, err error) (*IndexingResult, error) {
if ctx.Err() != nil { // assume error occurred because of context cancellation
return nil, err
}
if !ix.ForceReindex {
slog.Info("attempting to force reindex due to incremental indexing error", "error", err)
ix.ForceReindex = true
return ix.IndexRepository(ctx)
}
return nil, err
}
func (w *DefaultIndexWriter) Write(ctx context.Context, p *IndexParams) (*IndexingResult, error) {
var modifiedFilesCount, deletedFilesCount uint
putFunc := func(file *gitaly.File) error {
if err := ctx.Err(); err != nil {
return err
}
p.zoektBuilder.MarkFileAsChangedOrRemoved(file.Path)
if e := p.zoektClient.AddFile(p.zoektBuilder, file.Path, file.Content, file.Size, file.TooLarge, []string{"HEAD"}); e != nil {
return e
}
modifiedFilesCount++
return nil
}
delFunc := func(path string) error {
if err := ctx.Err(); err != nil {
return err
}
p.zoektBuilder.MarkFileAsChangedOrRemoved(path)
deletedFilesCount++
return nil
}
if err := p.gitalyClient.EachFileChange(putFunc, delFunc); err != nil { //nolint:contextcheck
return nil, err
}
if err := p.zoektBuilder.Finish(); err != nil {
return nil, err
}
indexingResult := &IndexingResult{
ModifiedFilesCount: modifiedFilesCount,
DeletedFilesCount: deletedFilesCount,
}
return indexingResult, nil
}