internal/mode/advanced/elastic/client.go (338 lines of code) (raw):
package elastic
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"time"
logkit "gitlab.com/gitlab-org/labkit/log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/deoxxa/aws_signing_client"
"github.com/olivere/elastic/v7"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/advanced/indexer"
)
var (
timeoutError = fmt.Errorf("Timeout")
)
type Client struct {
IndexNameDefault string
traversalIDs string
IndexNameCommits string
IndexNameWikis string
ProjectID int64
GroupID int64
hashedRootNamespaceId int16
Permissions *indexer.ProjectPermissions
PermissionsWiki *indexer.WikiPermissions
maxBulkSize int
Client *elastic.Client
bulk *elastic.BulkProcessor
bulkFailed bool
SearchCuration bool
archived string
schemaVersionBlob uint16
schemaVersionCommit uint16
schemaVersionWiki uint16
}
func BuildIndexName() string {
railsEnv := os.Getenv("RAILS_ENV")
var indexName = "gitlab"
if railsEnv != "" {
indexName = indexName + "-" + railsEnv
}
return indexName
}
// ConfigFromEnv creates a Config from the `ELASTIC_CONNECTION_INFO`
// environment variable
func ConfigFromEnv() (*Config, error) {
data := strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO"))
config, err := ReadConfig(data)
if err != nil {
return nil, fmt.Errorf("Couldn't parse ELASTIC_CONNECTION_INFO: %w", err)
}
if config.IndexNameCommits == "" {
config.IndexNameCommits = BuildIndexName() + "-commits"
}
if config.IndexNameDefault == "" {
config.IndexNameDefault = BuildIndexName()
}
return config, nil
}
func (c *Client) afterCallback(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
c.bulkFailed = true
if elastic.IsStatusCode(err, http.StatusRequestEntityTooLarge) {
logkit.WithFields(
logkit.Fields{
"bulkRequestId": executionId,
"maxBulkSizeSetting": c.maxBulkSize,
},
).WithError(err).Error("Consider lowering maximum bulk request size or/and increasing http.max_content_length")
} else {
logkit.WithFields(
logkit.Fields{
"bulkRequestId": executionId,
},
).WithError(err).Error("Bulk request failed")
}
}
// bulk response can be nil in some cases, we must check first
if response != nil && response.Errors {
failedBulkResponseItems := response.Failed()
numFailed := len(failedBulkResponseItems)
if numFailed > 0 {
c.bulkFailed = true
total := numFailed + len(response.Succeeded())
logkit.WithField("bulkRequestId", executionId).Errorf("Bulk request failed to insert %d/%d documents", numFailed, total)
}
for i, v := range failedBulkResponseItems {
logkit.WithField("item", i).Errorf("failed with error %s", v.Error.Reason)
}
}
}
func NewClient(config *Config, correlationID string) (*Client, error) {
var opts []elastic.ClientOptionFunc
httpClient := &http.Client{}
if config.RequestTimeout != 0 {
httpClient.Timeout = time.Duration(config.RequestTimeout) * time.Second
}
// AWS settings have to come first or they override custom URL, etc
if config.AWS {
awsConfig := defaults.Config().WithRegion(config.Region)
credentials := ResolveAWSCredentials(config, awsConfig)
signer := v4.NewSigner(credentials)
awsClient, err := aws_signing_client.New(signer, httpClient, "es", config.Region)
if err != nil {
return nil, err
}
opts = append(opts, elastic.SetHttpClient(awsClient))
} else {
if config.RequestTimeout != 0 {
opts = append(opts, elastic.SetHttpClient(httpClient))
}
}
// Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS
for _, url := range config.URL {
if strings.HasPrefix(url, "https:") {
opts = append(opts, elastic.SetScheme("https"))
break
}
}
headers := http.Header{}
headers.Add("X-Opaque-Id", correlationID)
opts = append(opts, elastic.SetHeaders(headers))
opts = append(opts, elastic.SetURL(config.URL...), elastic.SetSniff(false))
opts = append(opts, elastic.SetHealthcheck(false))
client, err := elastic.NewClient(opts...)
if err != nil {
return nil, err
}
wrappedClient := &Client{
IndexNameDefault: config.IndexNameDefault,
IndexNameCommits: config.IndexNameCommits,
IndexNameWikis: config.IndexNameWikis,
ProjectID: config.ProjectID,
GroupID: config.GroupID,
Permissions: config.Permissions,
PermissionsWiki: config.PermissionsWiki,
maxBulkSize: config.MaxBulkSize,
SearchCuration: config.SearchCuration,
traversalIDs: config.TraversalIDs,
Client: client,
hashedRootNamespaceId: config.HashedRootNamespaceId,
archived: config.Archived,
schemaVersionBlob: config.SchemaVersionBlob,
schemaVersionCommit: config.SchemaVersionCommit,
schemaVersionWiki: config.SchemaVersionWiki,
}
bulk, err := client.BulkProcessor().
Workers(config.BulkWorkers).
BulkSize(config.MaxBulkSize).
After(wrappedClient.afterCallback).
Do(context.Background())
if err != nil {
return nil, err
}
wrappedClient.bulk = bulk
return wrappedClient, nil
}
// ResolveAWSCredentials returns Credentials object
//
// Order of resolution
// 1. Static Credentials - As configured in Indexer config
// 2. Credentials from other providers
// 2a. Credentials via env variables
// 2b. Credentials via config files
// 2c. ECS Role Credentials
// 2d. EC2 Instance Role Credentials
func ResolveAWSCredentials(config *Config, awsConfig *aws.Config) *credentials.Credentials {
providers := []credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: config.AccessKey,
SecretAccessKey: config.SecretKey,
},
},
}
providers = append(providers, defaults.CredProviders(awsConfig, defaults.Handlers())...)
return credentials.NewChainCredentials(providers)
}
func (c *Client) ParentID() int64 {
return c.ProjectID
}
func (c *Client) ParentGroupID() int64 {
return c.GroupID
}
func (c *Client) ProjectPermissions() *indexer.ProjectPermissions {
return c.Permissions
}
func (c *Client) WikiPermissions() *indexer.WikiPermissions {
return c.PermissionsWiki
}
func (c *Client) Flush() error {
err := c.bulk.Flush()
if err == nil && c.bulkFailed {
err = fmt.Errorf("Failed to perform all operations")
}
return err
}
func (c *Client) Close() {
c.Client.Stop()
}
func (c *Client) indexNameFor(documentType string) string {
if documentType == "commit" && c.IndexNameCommits != "" {
return c.IndexNameCommits
} else if documentType == "wiki_blob" && c.IndexNameWikis != "" {
return c.IndexNameWikis
} else {
return c.IndexNameDefault
}
}
func (c *Client) routingFor(documentType string) string {
if documentType == "wiki_blob" {
return fmt.Sprintf("n_%v", strings.Split(c.TraversalIDs(), "-")[0]) // Using short string like n will help to keep the length of the url short
} else {
return fmt.Sprintf("project_%v", c.ProjectID)
}
}
func (c *Client) Index(documentType, id string, thing interface{}) {
indexName := c.indexNameFor(documentType)
routing := c.routingFor(documentType)
req := elastic.NewBulkIndexRequest().
Index(indexName).
Routing(routing).
Id(id).
Doc(thing)
if c.SearchCuration {
if err := c.DeleteFromRolledOverIndices(&DeleteFromRolledOverIndicesParams{
AliasName: indexName,
DocType: documentType,
DocId: id,
Routing: routing,
}); err != nil {
logkit.WithFields(
logkit.Fields{
"search_curation": indexName,
"doc_id": id,
"doc_type": documentType,
},
).WithError(err).Info("DeleteFromRolledOverIndices failed")
}
}
c.bulk.Add(req)
}
type DeleteFromRolledOverIndicesParams struct {
AliasName string
DocType string
DocId string
Routing string
}
func (c *Client) DeleteFromRolledOverIndices(params *DeleteFromRolledOverIndicesParams) error {
res, err := c.Client.Aliases().
Index(params.AliasName).
Pretty(true).
Do(context.TODO())
if err != nil {
return err
}
// There are no rolled over indices yet
if len(res.Indices) <= 1 {
return nil
}
for indexName, indexDetails := range res.Indices {
for _, aliasInfo := range indexDetails.Aliases {
if aliasInfo.AliasName != params.AliasName || aliasInfo.IsWriteIndex {
continue
}
logkit.WithFields(
logkit.Fields{
"search_curation": indexName,
"doc_id": params.DocId,
"doc_type": params.DocType,
},
).Debugf("Deleting doc `%s` from rollover index %s", params.DocId, indexName)
c.Delete(&DeleteParams{
Index: indexName,
DocId: params.DocId,
Routing: params.Routing,
})
}
}
return nil
}
// We only really use this for tests
func (c *Client) Get(documentType, id string) (*elastic.GetResult, error) {
routing := c.routingFor(documentType)
return c.Client.Get().
Index(c.indexNameFor(documentType)).
Routing(routing).
Id(id).
Do(context.TODO())
}
func (c *Client) GetCommit(id string) (*elastic.GetResult, error) {
return c.Get("commit", fmt.Sprintf("%v_%v", c.ProjectID, id))
}
func (c *Client) GetBlob(path string) (*elastic.GetResult, error) {
return c.Get("blob", fmt.Sprintf("%v_%v", c.ProjectID, path))
}
func (c *Client) GetWikiBlob(path string) (*elastic.GetResult, error) {
var id string
if c.IsGroupDocument() {
id = fmt.Sprintf("g_%v_%v", c.GroupID, path)
} else {
id = fmt.Sprintf("p_%v_%v", c.ProjectID, path)
}
return c.Get("wiki_blob", id)
}
func (c *Client) Remove(documentType, id string) {
routing := c.routingFor(documentType)
req := elastic.NewBulkDeleteRequest().
Index(c.indexNameFor(documentType)).
Routing(routing).
Id(id)
c.bulk.Add(req)
}
type DeleteParams struct {
Index string
Routing string
DocId string
}
func (c *Client) Delete(params *DeleteParams) {
// Same as Remove but with more flexible parameters
req := elastic.NewBulkDeleteRequest().
Index(params.Index).
Routing(params.Routing).
Id(params.DocId)
c.bulk.Add(req)
}
func (c *Client) TraversalIDs() string {
return c.traversalIDs
}
func (c *Client) Archived() string {
return c.archived
}
func (c *Client) HashedRootNamespaceId() int16 {
return c.hashedRootNamespaceId
}
func (c *Client) IsGroupDocument() bool {
return !c.IsProjectDocument() && c.GroupID > 0
}
func (c *Client) IsProjectDocument() bool {
return c.ProjectID > 0
}
func (c *Client) SchemaVersionBlob() uint16 {
return c.schemaVersionBlob
}
func (c *Client) SchemaVersionCommit() uint16 {
return c.schemaVersionCommit
}
func (c *Client) SchemaVersionWiki() uint16 {
return c.schemaVersionWiki
}