internal/mode/advanced/advanced.go (223 lines of code) (raw):
package advanced
import (
"errors"
"flag"
"fmt"
"io"
"os"
"time"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/elastic"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/git"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/indexer"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/shared"
"gitlab.com/gitlab-org/labkit/correlation"
logkit "gitlab.com/gitlab-org/labkit/log"
)
var (
versionFlag = flag.Bool("version", false, "Print the version and exit")
skipCommitsFlag = flag.Bool("skip-commits", false, "Skips indexing commits for the repo")
searchCurationFlag = flag.Bool("search-curation", false, "Enables deleting documents from rolled over indices")
blobTypeFlag = flag.String("blob-type", "blob", "The type of blobs to index. Accepted values: 'blob', 'wiki_blob'")
visibilityLevelFlag = flag.Int("visibility-level", -1, "Project or Group visibility_access_level. Accepted values: 0, 10, 20")
repositoryAccessLevelFlag = flag.Int("repository-access-level", -1, "Project repository_access_level. Accepted values: 0, 10, 20")
wikiAccessLevelFlag = flag.Int("wiki-access-level", -1, "Wiki repository_access_level. Accepted values: 0, 10, 20")
fullPathFlag = flag.String("full-path", "", "Full path")
projectIdFlag = flag.Int64("project-id", -1, "Project ID")
groupIdFlag = flag.Int64("group-id", -1, "Group ID")
timeoutOptionFlag = flag.String("timeout", "", "The timeout of the process. Empty string means no timeout. Accepted formats: '1s', '5m', '24h'")
traversalIdsFlag = flag.String("traversal-ids", "", "The flag to index the traversal_ids into ES. Accepted formats: '5-1-6-'")
hashedRootNamespaceIdFlag = flag.Int("hashed-root-namespace-id", -1, "The hashed root namespace id")
fromSHAFlag = flag.String("from-sha", "", "The flag to set fromSHA value. Accepted values: SHA-1 hash")
toSHAFlag = flag.String("to-sha", "", "The flag to set toSHA value. Accepted values: SHA-1 hash")
archivedFlag = flag.String("archived", "", "The project is archived or not")
schemaVersionBlobFlag = flag.Int("schema-version-blob", 0, "The value of schema_version for blob. Format: YYMM")
schemaVersionCommitFlag = flag.Int("schema-version-commit", 0, "The value of schema_version for commit. Format: YYMM")
schemaVersionWikiFlag = flag.Int("schema-version-wiki", 0, "The value of schema_version for wiki. Format: YYMM")
envCorrelationIDKey = "CORRELATION_ID"
Permissions *indexer.ProjectPermissions
)
func Run(buildOpts shared.BuildOpts) error {
closer, err := configureLogger()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error initializing logkit %v", err)
return err
}
defer closer.Close() //nolint:errcheck
flag.Parse()
if *versionFlag {
_, _ = fmt.Fprintf(os.Stdout, "%s %s (built at: %s)", os.Args[0], buildOpts.Version, buildOpts.BuildTime)
os.Exit(0)
}
args := flag.Args()
if len(args) != 1 {
argsError := errors.New("WrongArguments")
logkit.WithError(argsError).Fatalf("Usage: %s [ --version | [--blob-type=(blob|wiki_blob)] [--group-id=<ID>] [--project-id=<ID>] [--skip-commits] [--search-curation] [--full-path=<full-path>] [--timeout=<timeout>] [--visbility-level=<visbility-level>] [--repository-access-level=<repository-access-level>] [--wiki-access-level=<wiki-access-level>] <repo-path> ]", os.Args[0])
}
projectID := *projectIdFlag
groupID := *groupIdFlag
if projectID < 0 && groupID < 0 {
logkit.WithError(err).WithField("projectID", projectID).WithField("groupID", groupID).Fatalf("Error: Both projectID and groupID are empty")
}
repoPath := args[0]
blobType := *blobTypeFlag
fromSHA := *fromSHAFlag
toSHA := *toSHAFlag
skipCommits := *skipCommitsFlag
fullPath := *fullPathFlag
timeoutOption := *timeoutOptionFlag
searchCurationFlag := *searchCurationFlag
traversalIds := *traversalIdsFlag
hashedRootNamespaceId := int16(*hashedRootNamespaceIdFlag) //nolint:gosec
correlationID := generateCorrelationID()
archived := *archivedFlag
schemaVersionBlob := uint16(*schemaVersionBlobFlag) //nolint:gosec
schemaVersionCommit := uint16(*schemaVersionCommitFlag) //nolint:gosec
schemaVersionWiki := uint16(*schemaVersionWikiFlag) //nolint:gosec
if traversalIds == "" {
logkit.WithField("traversalIds", traversalIds).Fatal("Error: traversalIds is empty")
}
repo, err := git.NewGitalyClientFromEnv(repoPath, fromSHA, toSHA, correlationID, projectID, fullPath)
if err != nil {
logkit.WithFields(
logkit.Fields{
"repoPath": repoPath,
"fromSHA": fromSHA,
"toSHA": toSHA,
"correlationID": correlationID,
"projectID": projectID,
"groupID": groupID,
"fullPath": fullPath,
},
).WithError(err).Fatal("Error creating gitaly client")
}
config, err := loadConfig(projectID, groupID, searchCurationFlag, traversalIds, hashedRootNamespaceId, archived, schemaVersionBlob, schemaVersionCommit, schemaVersionWiki)
if err != nil {
logkit.WithError(err).WithFields(
logkit.Fields{
"projectID": projectID,
"searchCurationFlag": searchCurationFlag,
"traversalIds": traversalIds,
"hashedRootNamespaceId": hashedRootNamespaceId,
"archived": archived,
"schemaVersionBlob": schemaVersionBlob,
"schemaVersionCommit": schemaVersionCommit,
"schemaVersionWiki": schemaVersionWiki,
}).Fatalf("Error loading config")
}
esClient, err := elastic.NewClient(config, correlationID)
if err != nil {
logkit.WithError(err).Fatal("Error creating elastic client")
}
if esClient.IndexNameWikis != "" && blobType == "wiki_blob" && schemaVersionWiki == 0 {
logkit.WithField("schemaVersionWiki", schemaVersionWiki).Fatal("Error: schemaVersionWiki is empty")
}
if !skipCommits && schemaVersionCommit == 0 {
logkit.WithField("schemaVersionCommit", schemaVersionCommit).Fatal("Error: schemaVersionCommit is empty")
}
if blobType == "blob" && schemaVersionBlob == 0 {
logkit.WithField("schemaVersionBlob", schemaVersionBlob).Fatal("Error: schemaVersionBlob is empty")
}
if timeoutOption != "" {
timeout, err := time.ParseDuration(timeoutOption)
if err != nil {
logkit.WithError(err).WithField("timeoutOption", timeoutOption).Fatalf("Error parsing timeout")
} else {
logkit.WithField("timeout", timeout).Info("Setting timeout")
time.AfterFunc(timeout, func() {
timedOutErr := errors.New("TimedOut")
logkit.WithError(timedOutErr).WithField("timeout", timeout).Fatalf("The process has timed out")
})
}
}
idx := indexer.NewIndexer(repo, esClient)
logkit.WithFields(
logkit.Fields{
"IndexNameDefault": esClient.IndexNameDefault,
"IndexNameCommits": esClient.IndexNameCommits,
"IndexNameWikis": esClient.IndexNameWikis,
"projectID": esClient.ParentID(),
"blobType": blobType,
"skipCommits": skipCommits,
"searchCuration": config.SearchCuration,
"Permissions": config.Permissions,
"PermissionsWiki": config.PermissionsWiki,
"traversalIds": traversalIds,
"hashedRootNamespaceId": hashedRootNamespaceId,
"archived": archived,
"schemaVersionBlob": schemaVersionBlob,
"schemaVersionCommit": schemaVersionCommit,
"schemaVersionWiki": schemaVersionWiki,
},
).Debugf("Indexing from %s to %s", repo.FromHash, repo.ToHash)
if err := idx.IndexBlobs(blobType); err != nil {
logkit.WithError(err).Fatalln("Indexing error")
}
if !skipCommits && blobType == "blob" {
if err := idx.IndexCommits(); err != nil {
logkit.WithError(err).Fatalln("Indexing error")
}
}
if err := idx.Flush(); err != nil {
logkit.WithError(err).Fatalln("Flushing error")
}
return nil
}
func configureLogger() (io.Closer, error) {
_, debug := os.LookupEnv("DEBUG")
level := "info"
if debug {
level = "debug"
}
return logkit.Initialize(
logkit.WithLogLevel(level),
logkit.WithFormatter("text"),
logkit.WithOutputName("stdout"),
)
}
func loadConfig(projectID int64, groupID int64, searchCuration bool, traversalIds string, hashedRootNamespaceId int16, archived string, schemaVersionBlob uint16, schemaVersionCommit uint16, schemaVersionWiki uint16) (*elastic.Config, error) {
config, err := elastic.ConfigFromEnv()
config.Permissions = generateProjectPermissions()
config.PermissionsWiki = generateWikiPermissions()
config.ProjectID = projectID
config.GroupID = groupID
config.SearchCuration = searchCuration
config.TraversalIDs = traversalIds
config.HashedRootNamespaceId = hashedRootNamespaceId
config.Archived = archived
config.SchemaVersionBlob = schemaVersionBlob
config.SchemaVersionCommit = schemaVersionCommit
config.SchemaVersionWiki = schemaVersionWiki
return config, err
}
func generateCorrelationID() string {
var err error
cid := os.Getenv(envCorrelationIDKey)
if cid == "" {
if cid, err = correlation.RandomID(); err != nil {
// Should never happen since correlation.RandomID() should not fail,
// but if it does we return empty string, which is fine.
logkit.WithError(err).Error("Unable to generate random correlation ID")
}
}
return cid
}
func generateProjectPermissions() *indexer.ProjectPermissions {
visibilityLevel := *visibilityLevelFlag
repositoryAccessLevel := *repositoryAccessLevelFlag
if visibilityLevel == -1 || repositoryAccessLevel == -1 {
return nil
}
permissions := new(indexer.ProjectPermissions)
permissions.VisibilityLevel = int8(visibilityLevel) //nolint:gosec
permissions.RepositoryAccessLevel = int8(repositoryAccessLevel) //nolint:gosec
return permissions
}
func generateWikiPermissions() *indexer.WikiPermissions {
visibilityLevel := *visibilityLevelFlag
wikiAccessLevel := *wikiAccessLevelFlag
if visibilityLevel == -1 || wikiAccessLevel == -1 {
return nil
}
permissions := new(indexer.WikiPermissions)
permissions.VisibilityLevel = int8(visibilityLevel) //nolint:gosec
permissions.WikiAccessLevel = int8(wikiAccessLevel) //nolint:gosec
return permissions
}