internal/mode/advanced/advanced.go (223 lines of code) (raw):

package advanced import ( "errors" "flag" "fmt" "io" "os" "time" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/elastic" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/git" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/indexer" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/shared" "gitlab.com/gitlab-org/labkit/correlation" logkit "gitlab.com/gitlab-org/labkit/log" ) var ( versionFlag = flag.Bool("version", false, "Print the version and exit") skipCommitsFlag = flag.Bool("skip-commits", false, "Skips indexing commits for the repo") searchCurationFlag = flag.Bool("search-curation", false, "Enables deleting documents from rolled over indices") blobTypeFlag = flag.String("blob-type", "blob", "The type of blobs to index. Accepted values: 'blob', 'wiki_blob'") visibilityLevelFlag = flag.Int("visibility-level", -1, "Project or Group visibility_access_level. Accepted values: 0, 10, 20") repositoryAccessLevelFlag = flag.Int("repository-access-level", -1, "Project repository_access_level. Accepted values: 0, 10, 20") wikiAccessLevelFlag = flag.Int("wiki-access-level", -1, "Wiki repository_access_level. Accepted values: 0, 10, 20") fullPathFlag = flag.String("full-path", "", "Full path") projectIdFlag = flag.Int64("project-id", -1, "Project ID") groupIdFlag = flag.Int64("group-id", -1, "Group ID") timeoutOptionFlag = flag.String("timeout", "", "The timeout of the process. Empty string means no timeout. Accepted formats: '1s', '5m', '24h'") traversalIdsFlag = flag.String("traversal-ids", "", "The flag to index the traversal_ids into ES. Accepted formats: '5-1-6-'") hashedRootNamespaceIdFlag = flag.Int("hashed-root-namespace-id", -1, "The hashed root namespace id") fromSHAFlag = flag.String("from-sha", "", "The flag to set fromSHA value. Accepted values: SHA-1 hash") toSHAFlag = flag.String("to-sha", "", "The flag to set toSHA value. Accepted values: SHA-1 hash") archivedFlag = flag.String("archived", "", "The project is archived or not") schemaVersionBlobFlag = flag.Int("schema-version-blob", 0, "The value of schema_version for blob. Format: YYMM") schemaVersionCommitFlag = flag.Int("schema-version-commit", 0, "The value of schema_version for commit. Format: YYMM") schemaVersionWikiFlag = flag.Int("schema-version-wiki", 0, "The value of schema_version for wiki. Format: YYMM") envCorrelationIDKey = "CORRELATION_ID" Permissions *indexer.ProjectPermissions ) func Run(buildOpts shared.BuildOpts) error { closer, err := configureLogger() if err != nil { _, _ = fmt.Fprintf(os.Stderr, "error initializing logkit %v", err) return err } defer closer.Close() //nolint:errcheck flag.Parse() if *versionFlag { _, _ = fmt.Fprintf(os.Stdout, "%s %s (built at: %s)", os.Args[0], buildOpts.Version, buildOpts.BuildTime) os.Exit(0) } args := flag.Args() if len(args) != 1 { argsError := errors.New("WrongArguments") logkit.WithError(argsError).Fatalf("Usage: %s [ --version | [--blob-type=(blob|wiki_blob)] [--group-id=<ID>] [--project-id=<ID>] [--skip-commits] [--search-curation] [--full-path=<full-path>] [--timeout=<timeout>] [--visbility-level=<visbility-level>] [--repository-access-level=<repository-access-level>] [--wiki-access-level=<wiki-access-level>] <repo-path> ]", os.Args[0]) } projectID := *projectIdFlag groupID := *groupIdFlag if projectID < 0 && groupID < 0 { logkit.WithError(err).WithField("projectID", projectID).WithField("groupID", groupID).Fatalf("Error: Both projectID and groupID are empty") } repoPath := args[0] blobType := *blobTypeFlag fromSHA := *fromSHAFlag toSHA := *toSHAFlag skipCommits := *skipCommitsFlag fullPath := *fullPathFlag timeoutOption := *timeoutOptionFlag searchCurationFlag := *searchCurationFlag traversalIds := *traversalIdsFlag hashedRootNamespaceId := int16(*hashedRootNamespaceIdFlag) //nolint:gosec correlationID := generateCorrelationID() archived := *archivedFlag schemaVersionBlob := uint16(*schemaVersionBlobFlag) //nolint:gosec schemaVersionCommit := uint16(*schemaVersionCommitFlag) //nolint:gosec schemaVersionWiki := uint16(*schemaVersionWikiFlag) //nolint:gosec if traversalIds == "" { logkit.WithField("traversalIds", traversalIds).Fatal("Error: traversalIds is empty") } repo, err := git.NewGitalyClientFromEnv(repoPath, fromSHA, toSHA, correlationID, projectID, fullPath) if err != nil { logkit.WithFields( logkit.Fields{ "repoPath": repoPath, "fromSHA": fromSHA, "toSHA": toSHA, "correlationID": correlationID, "projectID": projectID, "groupID": groupID, "fullPath": fullPath, }, ).WithError(err).Fatal("Error creating gitaly client") } config, err := loadConfig(projectID, groupID, searchCurationFlag, traversalIds, hashedRootNamespaceId, archived, schemaVersionBlob, schemaVersionCommit, schemaVersionWiki) if err != nil { logkit.WithError(err).WithFields( logkit.Fields{ "projectID": projectID, "searchCurationFlag": searchCurationFlag, "traversalIds": traversalIds, "hashedRootNamespaceId": hashedRootNamespaceId, "archived": archived, "schemaVersionBlob": schemaVersionBlob, "schemaVersionCommit": schemaVersionCommit, "schemaVersionWiki": schemaVersionWiki, }).Fatalf("Error loading config") } esClient, err := elastic.NewClient(config, correlationID) if err != nil { logkit.WithError(err).Fatal("Error creating elastic client") } if esClient.IndexNameWikis != "" && blobType == "wiki_blob" && schemaVersionWiki == 0 { logkit.WithField("schemaVersionWiki", schemaVersionWiki).Fatal("Error: schemaVersionWiki is empty") } if !skipCommits && schemaVersionCommit == 0 { logkit.WithField("schemaVersionCommit", schemaVersionCommit).Fatal("Error: schemaVersionCommit is empty") } if blobType == "blob" && schemaVersionBlob == 0 { logkit.WithField("schemaVersionBlob", schemaVersionBlob).Fatal("Error: schemaVersionBlob is empty") } if timeoutOption != "" { timeout, err := time.ParseDuration(timeoutOption) if err != nil { logkit.WithError(err).WithField("timeoutOption", timeoutOption).Fatalf("Error parsing timeout") } else { logkit.WithField("timeout", timeout).Info("Setting timeout") time.AfterFunc(timeout, func() { timedOutErr := errors.New("TimedOut") logkit.WithError(timedOutErr).WithField("timeout", timeout).Fatalf("The process has timed out") }) } } idx := indexer.NewIndexer(repo, esClient) logkit.WithFields( logkit.Fields{ "IndexNameDefault": esClient.IndexNameDefault, "IndexNameCommits": esClient.IndexNameCommits, "IndexNameWikis": esClient.IndexNameWikis, "projectID": esClient.ParentID(), "blobType": blobType, "skipCommits": skipCommits, "searchCuration": config.SearchCuration, "Permissions": config.Permissions, "PermissionsWiki": config.PermissionsWiki, "traversalIds": traversalIds, "hashedRootNamespaceId": hashedRootNamespaceId, "archived": archived, "schemaVersionBlob": schemaVersionBlob, "schemaVersionCommit": schemaVersionCommit, "schemaVersionWiki": schemaVersionWiki, }, ).Debugf("Indexing from %s to %s", repo.FromHash, repo.ToHash) if err := idx.IndexBlobs(blobType); err != nil { logkit.WithError(err).Fatalln("Indexing error") } if !skipCommits && blobType == "blob" { if err := idx.IndexCommits(); err != nil { logkit.WithError(err).Fatalln("Indexing error") } } if err := idx.Flush(); err != nil { logkit.WithError(err).Fatalln("Flushing error") } return nil } func configureLogger() (io.Closer, error) { _, debug := os.LookupEnv("DEBUG") level := "info" if debug { level = "debug" } return logkit.Initialize( logkit.WithLogLevel(level), logkit.WithFormatter("text"), logkit.WithOutputName("stdout"), ) } func loadConfig(projectID int64, groupID int64, searchCuration bool, traversalIds string, hashedRootNamespaceId int16, archived string, schemaVersionBlob uint16, schemaVersionCommit uint16, schemaVersionWiki uint16) (*elastic.Config, error) { config, err := elastic.ConfigFromEnv() config.Permissions = generateProjectPermissions() config.PermissionsWiki = generateWikiPermissions() config.ProjectID = projectID config.GroupID = groupID config.SearchCuration = searchCuration config.TraversalIDs = traversalIds config.HashedRootNamespaceId = hashedRootNamespaceId config.Archived = archived config.SchemaVersionBlob = schemaVersionBlob config.SchemaVersionCommit = schemaVersionCommit config.SchemaVersionWiki = schemaVersionWiki return config, err } func generateCorrelationID() string { var err error cid := os.Getenv(envCorrelationIDKey) if cid == "" { if cid, err = correlation.RandomID(); err != nil { // Should never happen since correlation.RandomID() should not fail, // but if it does we return empty string, which is fine. logkit.WithError(err).Error("Unable to generate random correlation ID") } } return cid } func generateProjectPermissions() *indexer.ProjectPermissions { visibilityLevel := *visibilityLevelFlag repositoryAccessLevel := *repositoryAccessLevelFlag if visibilityLevel == -1 || repositoryAccessLevel == -1 { return nil } permissions := new(indexer.ProjectPermissions) permissions.VisibilityLevel = int8(visibilityLevel) //nolint:gosec permissions.RepositoryAccessLevel = int8(repositoryAccessLevel) //nolint:gosec return permissions } func generateWikiPermissions() *indexer.WikiPermissions { visibilityLevel := *visibilityLevelFlag wikiAccessLevel := *wikiAccessLevelFlag if visibilityLevel == -1 || wikiAccessLevel == -1 { return nil } permissions := new(indexer.WikiPermissions) permissions.VisibilityLevel = int8(visibilityLevel) //nolint:gosec permissions.WikiAccessLevel = int8(wikiAccessLevel) //nolint:gosec return permissions }