internal/mode/advanced/indexer/indexer.go (188 lines of code) (raw):

package indexer import ( "fmt" logkit "gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/git" "strings" "strconv" ) type Submitter interface { ParentID() int64 ParentGroupID() int64 TraversalIDs() string HashedRootNamespaceId() int16 SchemaVersionBlob() uint16 SchemaVersionCommit() uint16 SchemaVersionWiki() uint16 Archived() string ProjectPermissions() *ProjectPermissions WikiPermissions() *WikiPermissions Index(documentType, id string, thing interface{}) Remove(documentType, id string) IsProjectDocument() bool IsGroupDocument() bool Flush() error } type Indexer struct { git.Repository Submitter *Encoder } type ProjectPermissions struct { //nolint:musttag VisibilityLevel int8 RepositoryAccessLevel int8 } type WikiPermissions struct { VisibilityLevel int8 WikiAccessLevel int8 } func NewIndexer(repository git.Repository, submitter Submitter) *Indexer { return &Indexer{ Repository: repository, Submitter: submitter, Encoder: NewEncoder(repository.GetLimitFileSize()), } } func extractArchivedFlag(archived string) (bool, error) { archivedBool, err := strconv.ParseBool(archived) if err != nil { return false, fmt.Errorf("Unable to parse boolean value %w", err) } return archivedBool, nil } func (i *Indexer) submitCommit(c *git.Commit) error { commit := i.BuildCommit(c) var commitBody map[string]interface{} var err error commitBody, err = commit.ToMap() if err != nil { return fmt.Errorf("Commit %s, %w", c.Hash, err) } if i.Submitter.Archived() != "" { archived, err := extractArchivedFlag(i.Submitter.Archived()) if err != nil { return err } commitBody["archived"] = archived } if permissions := i.Submitter.ProjectPermissions(); permissions != nil { commitBody["visibility_level"] = permissions.VisibilityLevel commitBody["repository_access_level"] = permissions.RepositoryAccessLevel } if i.Submitter.HashedRootNamespaceId() > 0 { commitBody["hashed_root_namespace_id"] = i.Submitter.HashedRootNamespaceId() } commitBody["schema_version"] = i.Submitter.SchemaVersionCommit() i.Submitter.Index("commit", commit.ID, commitBody) return nil } func (i *Indexer) submitRepoBlob(f *git.File, _, toCommit string) error { blob, err := BuildBlob(f, i.Submitter.ParentID(), toCommit, "blob", i.Encoder, i.Submitter.IsProjectDocument()) if err != nil { return fmt.Errorf("Blob %s: %w", f.Path, err) } joinData := map[string]string{ "name": "blob", "parent": fmt.Sprintf("project_%v", i.Submitter.ParentID()), } indexJsonData := map[string]interface{}{"project_id": i.Submitter.ParentID(), "blob": blob, "type": "blob", "join_field": joinData} if i.Submitter.TraversalIDs() != "" { indexJsonData["traversal_ids"] = i.Submitter.TraversalIDs() } if i.Submitter.Archived() != "" { archived, err := extractArchivedFlag(i.Submitter.Archived()) if err != nil { return err } indexJsonData["archived"] = archived } if permissions := i.Submitter.ProjectPermissions(); permissions != nil { indexJsonData["visibility_level"] = permissions.VisibilityLevel indexJsonData["repository_access_level"] = permissions.RepositoryAccessLevel } indexJsonData["schema_version"] = i.Submitter.SchemaVersionBlob() i.Submitter.Index("blob", blob.ID, indexJsonData) return nil } func (i *Indexer) submitWikiBlob(f *git.File, _, toCommit string) error { wikiBlob, err := BuildBlob(f, i.wikiSourceId(), toCommit, "wiki_blob", i.Encoder, i.Submitter.IsProjectDocument()) if err != nil { return fmt.Errorf("WikiBlob %s: %w", f.Path, err) } indexJsonData, err := wikiBlob.ToMap() if err != nil { return fmt.Errorf("WikiBlob %w", err) } if i.Submitter.IsProjectDocument() { indexJsonData["project_id"] = i.Submitter.ParentID() indexJsonData["rid"] = strings.Replace(wikiBlob.RepoID, "wiki", "wiki_project", 1) if i.Submitter.Archived() != "" { archived, err := extractArchivedFlag(i.Submitter.Archived()) if err != nil { return err } indexJsonData["archived"] = archived } } if i.Submitter.ParentGroupID() > 0 { indexJsonData["group_id"] = i.Submitter.ParentGroupID() // Group wiki if i.Submitter.IsGroupDocument() { indexJsonData["rid"] = strings.Replace(wikiBlob.RepoID, "wiki", "wiki_group", 1) } } indexJsonData["schema_version"] = i.Submitter.SchemaVersionWiki() if i.Submitter.TraversalIDs() != "" { indexJsonData["traversal_ids"] = i.Submitter.TraversalIDs() } if permissions := i.Submitter.WikiPermissions(); permissions != nil { indexJsonData["visibility_level"] = permissions.VisibilityLevel indexJsonData["wiki_access_level"] = permissions.WikiAccessLevel } i.Submitter.Index("wiki_blob", wikiBlob.ID, indexJsonData) return nil } func (i *Indexer) removeWikiBlob(path string) error { blobID := GenerateWikiBlobId(i.wikiSourceId(), path, i.Submitter.IsProjectDocument()) i.Submitter.Remove("wiki_blob", blobID) return nil } func (i *Indexer) removeRepoBlob(path string) error { blobID := GenerateBlobID(i.Submitter.ParentID(), path) i.Submitter.Remove("blob", blobID) return nil } func (i *Indexer) wikiSourceId() int64 { if i.Submitter.IsGroupDocument() { return i.Submitter.ParentGroupID() } else { return i.Submitter.ParentID() } } func (i *Indexer) indexCommits() error { return i.Repository.EachCommit(i.submitCommit) } func (i *Indexer) indexRepoBlobs() error { return i.Repository.EachFileChange(i.submitRepoBlob, i.removeRepoBlob) } func (i *Indexer) indexWikiBlobs() error { return i.Repository.EachFileChange(i.submitWikiBlob, i.removeWikiBlob) } func (i *Indexer) Flush() error { return i.Submitter.Flush() } func (i *Indexer) IndexBlobs(blobType string) error { switch blobType { case "blob": return i.indexRepoBlobs() case "wiki_blob": return i.indexWikiBlobs() } return fmt.Errorf("unknown blob type: %v", blobType) } func (i *Indexer) IndexCommits() error { if err := i.indexCommits(); err != nil { logkit.WithError(err).Error("error while indexing commits") return err } return nil }