internal/mode/advanced/elastic/client.go (338 lines of code) (raw):

package elastic import ( "context" "fmt" "net/http" "os" "strings" "time" logkit "gitlab.com/gitlab-org/labkit/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/defaults" v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/deoxxa/aws_signing_client" "github.com/olivere/elastic/v7" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/indexer" ) var ( timeoutError = fmt.Errorf("Timeout") ) type Client struct { IndexNameDefault string traversalIDs string IndexNameCommits string IndexNameWikis string ProjectID int64 GroupID int64 hashedRootNamespaceId int16 Permissions *indexer.ProjectPermissions PermissionsWiki *indexer.WikiPermissions maxBulkSize int Client *elastic.Client bulk *elastic.BulkProcessor bulkFailed bool SearchCuration bool archived string schemaVersionBlob uint16 schemaVersionCommit uint16 schemaVersionWiki uint16 } func BuildIndexName() string { railsEnv := os.Getenv("RAILS_ENV") var indexName = "gitlab" if railsEnv != "" { indexName = indexName + "-" + railsEnv } return indexName } // ConfigFromEnv creates a Config from the `ELASTIC_CONNECTION_INFO` // environment variable func ConfigFromEnv() (*Config, error) { data := strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO")) config, err := ReadConfig(data) if err != nil { return nil, fmt.Errorf("Couldn't parse ELASTIC_CONNECTION_INFO: %w", err) } if config.IndexNameCommits == "" { config.IndexNameCommits = BuildIndexName() + "-commits" } if config.IndexNameDefault == "" { config.IndexNameDefault = BuildIndexName() } return config, nil } func (c *Client) afterCallback(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if err != nil { c.bulkFailed = true if elastic.IsStatusCode(err, http.StatusRequestEntityTooLarge) { logkit.WithFields( logkit.Fields{ "bulkRequestId": executionId, "maxBulkSizeSetting": c.maxBulkSize, }, ).WithError(err).Error("Consider lowering maximum bulk request size or/and increasing http.max_content_length") } else { logkit.WithFields( logkit.Fields{ "bulkRequestId": executionId, }, ).WithError(err).Error("Bulk request failed") } } // bulk response can be nil in some cases, we must check first if response != nil && response.Errors { failedBulkResponseItems := response.Failed() numFailed := len(failedBulkResponseItems) if numFailed > 0 { c.bulkFailed = true total := numFailed + len(response.Succeeded()) logkit.WithField("bulkRequestId", executionId).Errorf("Bulk request failed to insert %d/%d documents", numFailed, total) } for i, v := range failedBulkResponseItems { logkit.WithField("item", i).Errorf("failed with error %s", v.Error.Reason) } } } func NewClient(config *Config, correlationID string) (*Client, error) { var opts []elastic.ClientOptionFunc httpClient := &http.Client{} if config.RequestTimeout != 0 { httpClient.Timeout = time.Duration(config.RequestTimeout) * time.Second } // AWS settings have to come first or they override custom URL, etc if config.AWS { awsConfig := defaults.Config().WithRegion(config.Region) credentials := ResolveAWSCredentials(config, awsConfig) signer := v4.NewSigner(credentials) awsClient, err := aws_signing_client.New(signer, httpClient, "es", config.Region) if err != nil { return nil, err } opts = append(opts, elastic.SetHttpClient(awsClient)) } else { if config.RequestTimeout != 0 { opts = append(opts, elastic.SetHttpClient(httpClient)) } } // Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS for _, url := range config.URL { if strings.HasPrefix(url, "https:") { opts = append(opts, elastic.SetScheme("https")) break } } headers := http.Header{} headers.Add("X-Opaque-Id", correlationID) opts = append(opts, elastic.SetHeaders(headers)) opts = append(opts, elastic.SetURL(config.URL...), elastic.SetSniff(false)) opts = append(opts, elastic.SetHealthcheck(false)) client, err := elastic.NewClient(opts...) if err != nil { return nil, err } wrappedClient := &Client{ IndexNameDefault: config.IndexNameDefault, IndexNameCommits: config.IndexNameCommits, IndexNameWikis: config.IndexNameWikis, ProjectID: config.ProjectID, GroupID: config.GroupID, Permissions: config.Permissions, PermissionsWiki: config.PermissionsWiki, maxBulkSize: config.MaxBulkSize, SearchCuration: config.SearchCuration, traversalIDs: config.TraversalIDs, Client: client, hashedRootNamespaceId: config.HashedRootNamespaceId, archived: config.Archived, schemaVersionBlob: config.SchemaVersionBlob, schemaVersionCommit: config.SchemaVersionCommit, schemaVersionWiki: config.SchemaVersionWiki, } bulk, err := client.BulkProcessor(). Workers(config.BulkWorkers). BulkSize(config.MaxBulkSize). After(wrappedClient.afterCallback). Do(context.Background()) if err != nil { return nil, err } wrappedClient.bulk = bulk return wrappedClient, nil } // ResolveAWSCredentials returns Credentials object // // Order of resolution // 1. Static Credentials - As configured in Indexer config // 2. Credentials from other providers // 2a. Credentials via env variables // 2b. Credentials via config files // 2c. ECS Role Credentials // 2d. EC2 Instance Role Credentials func ResolveAWSCredentials(config *Config, awsConfig *aws.Config) *credentials.Credentials { providers := []credentials.Provider{ &credentials.StaticProvider{ Value: credentials.Value{ AccessKeyID: config.AccessKey, SecretAccessKey: config.SecretKey, }, }, } providers = append(providers, defaults.CredProviders(awsConfig, defaults.Handlers())...) return credentials.NewChainCredentials(providers) } func (c *Client) ParentID() int64 { return c.ProjectID } func (c *Client) ParentGroupID() int64 { return c.GroupID } func (c *Client) ProjectPermissions() *indexer.ProjectPermissions { return c.Permissions } func (c *Client) WikiPermissions() *indexer.WikiPermissions { return c.PermissionsWiki } func (c *Client) Flush() error { err := c.bulk.Flush() if err == nil && c.bulkFailed { err = fmt.Errorf("Failed to perform all operations") } return err } func (c *Client) Close() { c.Client.Stop() } func (c *Client) indexNameFor(documentType string) string { if documentType == "commit" && c.IndexNameCommits != "" { return c.IndexNameCommits } else if documentType == "wiki_blob" && c.IndexNameWikis != "" { return c.IndexNameWikis } else { return c.IndexNameDefault } } func (c *Client) routingFor(documentType string) string { if documentType == "wiki_blob" { return fmt.Sprintf("n_%v", strings.Split(c.TraversalIDs(), "-")[0]) // Using short string like n will help to keep the length of the url short } else { return fmt.Sprintf("project_%v", c.ProjectID) } } func (c *Client) Index(documentType, id string, thing interface{}) { indexName := c.indexNameFor(documentType) routing := c.routingFor(documentType) req := elastic.NewBulkIndexRequest(). Index(indexName). Routing(routing). Id(id). Doc(thing) if c.SearchCuration { if err := c.DeleteFromRolledOverIndices(&DeleteFromRolledOverIndicesParams{ AliasName: indexName, DocType: documentType, DocId: id, Routing: routing, }); err != nil { logkit.WithFields( logkit.Fields{ "search_curation": indexName, "doc_id": id, "doc_type": documentType, }, ).WithError(err).Info("DeleteFromRolledOverIndices failed") } } c.bulk.Add(req) } type DeleteFromRolledOverIndicesParams struct { AliasName string DocType string DocId string Routing string } func (c *Client) DeleteFromRolledOverIndices(params *DeleteFromRolledOverIndicesParams) error { res, err := c.Client.Aliases(). Index(params.AliasName). Pretty(true). Do(context.TODO()) if err != nil { return err } // There are no rolled over indices yet if len(res.Indices) <= 1 { return nil } for indexName, indexDetails := range res.Indices { for _, aliasInfo := range indexDetails.Aliases { if aliasInfo.AliasName != params.AliasName || aliasInfo.IsWriteIndex { continue } logkit.WithFields( logkit.Fields{ "search_curation": indexName, "doc_id": params.DocId, "doc_type": params.DocType, }, ).Debugf("Deleting doc `%s` from rollover index %s", params.DocId, indexName) c.Delete(&DeleteParams{ Index: indexName, DocId: params.DocId, Routing: params.Routing, }) } } return nil } // We only really use this for tests func (c *Client) Get(documentType, id string) (*elastic.GetResult, error) { routing := c.routingFor(documentType) return c.Client.Get(). Index(c.indexNameFor(documentType)). Routing(routing). Id(id). Do(context.TODO()) } func (c *Client) GetCommit(id string) (*elastic.GetResult, error) { return c.Get("commit", fmt.Sprintf("%v_%v", c.ProjectID, id)) } func (c *Client) GetBlob(path string) (*elastic.GetResult, error) { return c.Get("blob", fmt.Sprintf("%v_%v", c.ProjectID, path)) } func (c *Client) GetWikiBlob(path string) (*elastic.GetResult, error) { var id string if c.IsGroupDocument() { id = fmt.Sprintf("g_%v_%v", c.GroupID, path) } else { id = fmt.Sprintf("p_%v_%v", c.ProjectID, path) } return c.Get("wiki_blob", id) } func (c *Client) Remove(documentType, id string) { routing := c.routingFor(documentType) req := elastic.NewBulkDeleteRequest(). Index(c.indexNameFor(documentType)). Routing(routing). Id(id) c.bulk.Add(req) } type DeleteParams struct { Index string Routing string DocId string } func (c *Client) Delete(params *DeleteParams) { // Same as Remove but with more flexible parameters req := elastic.NewBulkDeleteRequest(). Index(params.Index). Routing(params.Routing). Id(params.DocId) c.bulk.Add(req) } func (c *Client) TraversalIDs() string { return c.traversalIDs } func (c *Client) Archived() string { return c.archived } func (c *Client) HashedRootNamespaceId() int16 { return c.hashedRootNamespaceId } func (c *Client) IsGroupDocument() bool { return !c.IsProjectDocument() && c.GroupID > 0 } func (c *Client) IsProjectDocument() bool { return c.ProjectID > 0 } func (c *Client) SchemaVersionBlob() uint16 { return c.schemaVersionBlob } func (c *Client) SchemaVersionCommit() uint16 { return c.schemaVersionCommit } func (c *Client) SchemaVersionWiki() uint16 { return c.schemaVersionWiki }