internal/mode/advanced/indexer/indexer.go (188 lines of code) (raw):
package indexer
import (
"fmt"
logkit "gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/git"
"strings"
"strconv"
)
type Submitter interface {
ParentID() int64
ParentGroupID() int64
TraversalIDs() string
HashedRootNamespaceId() int16
SchemaVersionBlob() uint16
SchemaVersionCommit() uint16
SchemaVersionWiki() uint16
Archived() string
ProjectPermissions() *ProjectPermissions
WikiPermissions() *WikiPermissions
Index(documentType, id string, thing interface{})
Remove(documentType, id string)
IsProjectDocument() bool
IsGroupDocument() bool
Flush() error
}
type Indexer struct {
git.Repository
Submitter
*Encoder
}
type ProjectPermissions struct { //nolint:musttag
VisibilityLevel int8
RepositoryAccessLevel int8
}
type WikiPermissions struct {
VisibilityLevel int8
WikiAccessLevel int8
}
func NewIndexer(repository git.Repository, submitter Submitter) *Indexer {
return &Indexer{
Repository: repository,
Submitter: submitter,
Encoder: NewEncoder(repository.GetLimitFileSize()),
}
}
func extractArchivedFlag(archived string) (bool, error) {
archivedBool, err := strconv.ParseBool(archived)
if err != nil {
return false, fmt.Errorf("Unable to parse boolean value %w", err)
}
return archivedBool, nil
}
func (i *Indexer) submitCommit(c *git.Commit) error {
commit := i.BuildCommit(c)
var commitBody map[string]interface{}
var err error
commitBody, err = commit.ToMap()
if err != nil {
return fmt.Errorf("Commit %s, %w", c.Hash, err)
}
if i.Submitter.Archived() != "" {
archived, err := extractArchivedFlag(i.Submitter.Archived())
if err != nil {
return err
}
commitBody["archived"] = archived
}
if permissions := i.Submitter.ProjectPermissions(); permissions != nil {
commitBody["visibility_level"] = permissions.VisibilityLevel
commitBody["repository_access_level"] = permissions.RepositoryAccessLevel
}
if i.Submitter.HashedRootNamespaceId() > 0 {
commitBody["hashed_root_namespace_id"] = i.Submitter.HashedRootNamespaceId()
}
commitBody["schema_version"] = i.Submitter.SchemaVersionCommit()
i.Submitter.Index("commit", commit.ID, commitBody)
return nil
}
func (i *Indexer) submitRepoBlob(f *git.File, _, toCommit string) error {
blob, err := BuildBlob(f, i.Submitter.ParentID(), toCommit, "blob", i.Encoder, i.Submitter.IsProjectDocument())
if err != nil {
return fmt.Errorf("Blob %s: %w", f.Path, err)
}
joinData := map[string]string{
"name": "blob",
"parent": fmt.Sprintf("project_%v", i.Submitter.ParentID()),
}
indexJsonData := map[string]interface{}{"project_id": i.Submitter.ParentID(), "blob": blob, "type": "blob", "join_field": joinData}
if i.Submitter.TraversalIDs() != "" {
indexJsonData["traversal_ids"] = i.Submitter.TraversalIDs()
}
if i.Submitter.Archived() != "" {
archived, err := extractArchivedFlag(i.Submitter.Archived())
if err != nil {
return err
}
indexJsonData["archived"] = archived
}
if permissions := i.Submitter.ProjectPermissions(); permissions != nil {
indexJsonData["visibility_level"] = permissions.VisibilityLevel
indexJsonData["repository_access_level"] = permissions.RepositoryAccessLevel
}
indexJsonData["schema_version"] = i.Submitter.SchemaVersionBlob()
i.Submitter.Index("blob", blob.ID, indexJsonData)
return nil
}
func (i *Indexer) submitWikiBlob(f *git.File, _, toCommit string) error {
wikiBlob, err := BuildBlob(f, i.wikiSourceId(), toCommit, "wiki_blob", i.Encoder, i.Submitter.IsProjectDocument())
if err != nil {
return fmt.Errorf("WikiBlob %s: %w", f.Path, err)
}
indexJsonData, err := wikiBlob.ToMap()
if err != nil {
return fmt.Errorf("WikiBlob %w", err)
}
if i.Submitter.IsProjectDocument() {
indexJsonData["project_id"] = i.Submitter.ParentID()
indexJsonData["rid"] = strings.Replace(wikiBlob.RepoID, "wiki", "wiki_project", 1)
if i.Submitter.Archived() != "" {
archived, err := extractArchivedFlag(i.Submitter.Archived())
if err != nil {
return err
}
indexJsonData["archived"] = archived
}
}
if i.Submitter.ParentGroupID() > 0 {
indexJsonData["group_id"] = i.Submitter.ParentGroupID()
// Group wiki
if i.Submitter.IsGroupDocument() {
indexJsonData["rid"] = strings.Replace(wikiBlob.RepoID, "wiki", "wiki_group", 1)
}
}
indexJsonData["schema_version"] = i.Submitter.SchemaVersionWiki()
if i.Submitter.TraversalIDs() != "" {
indexJsonData["traversal_ids"] = i.Submitter.TraversalIDs()
}
if permissions := i.Submitter.WikiPermissions(); permissions != nil {
indexJsonData["visibility_level"] = permissions.VisibilityLevel
indexJsonData["wiki_access_level"] = permissions.WikiAccessLevel
}
i.Submitter.Index("wiki_blob", wikiBlob.ID, indexJsonData)
return nil
}
func (i *Indexer) removeWikiBlob(path string) error {
blobID := GenerateWikiBlobId(i.wikiSourceId(), path, i.Submitter.IsProjectDocument())
i.Submitter.Remove("wiki_blob", blobID)
return nil
}
func (i *Indexer) removeRepoBlob(path string) error {
blobID := GenerateBlobID(i.Submitter.ParentID(), path)
i.Submitter.Remove("blob", blobID)
return nil
}
func (i *Indexer) wikiSourceId() int64 {
if i.Submitter.IsGroupDocument() {
return i.Submitter.ParentGroupID()
} else {
return i.Submitter.ParentID()
}
}
func (i *Indexer) indexCommits() error {
return i.Repository.EachCommit(i.submitCommit)
}
func (i *Indexer) indexRepoBlobs() error {
return i.Repository.EachFileChange(i.submitRepoBlob, i.removeRepoBlob)
}
func (i *Indexer) indexWikiBlobs() error {
return i.Repository.EachFileChange(i.submitWikiBlob, i.removeWikiBlob)
}
func (i *Indexer) Flush() error {
return i.Submitter.Flush()
}
func (i *Indexer) IndexBlobs(blobType string) error {
switch blobType {
case "blob":
return i.indexRepoBlobs()
case "wiki_blob":
return i.indexWikiBlobs()
}
return fmt.Errorf("unknown blob type: %v", blobType)
}
func (i *Indexer) IndexCommits() error {
if err := i.indexCommits(); err != nil {
logkit.WithError(err).Error("error while indexing commits")
return err
}
return nil
}