func NewClient()

in internal/mode/advanced/elastic/client.go [114:191]


func NewClient(config *Config, correlationID string) (*Client, error) {
	var opts []elastic.ClientOptionFunc

	httpClient := &http.Client{}
	if config.RequestTimeout != 0 {
		httpClient.Timeout = time.Duration(config.RequestTimeout) * time.Second
	}
	// AWS settings have to come first or they override custom URL, etc
	if config.AWS {
		awsConfig := defaults.Config().WithRegion(config.Region)
		credentials := ResolveAWSCredentials(config, awsConfig)
		signer := v4.NewSigner(credentials)
		awsClient, err := aws_signing_client.New(signer, httpClient, "es", config.Region)
		if err != nil {
			return nil, err
		}

		opts = append(opts, elastic.SetHttpClient(awsClient))
	} else {
		if config.RequestTimeout != 0 {
			opts = append(opts, elastic.SetHttpClient(httpClient))
		}
	}

	// Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS
	for _, url := range config.URL {
		if strings.HasPrefix(url, "https:") {
			opts = append(opts, elastic.SetScheme("https"))
			break
		}
	}

	headers := http.Header{}
	headers.Add("X-Opaque-Id", correlationID)
	opts = append(opts, elastic.SetHeaders(headers))

	opts = append(opts, elastic.SetURL(config.URL...), elastic.SetSniff(false))

	opts = append(opts, elastic.SetHealthcheck(false))

	client, err := elastic.NewClient(opts...)
	if err != nil {
		return nil, err
	}

	wrappedClient := &Client{
		IndexNameDefault:      config.IndexNameDefault,
		IndexNameCommits:      config.IndexNameCommits,
		IndexNameWikis:        config.IndexNameWikis,
		ProjectID:             config.ProjectID,
		GroupID:               config.GroupID,
		Permissions:           config.Permissions,
		PermissionsWiki:       config.PermissionsWiki,
		maxBulkSize:           config.MaxBulkSize,
		SearchCuration:        config.SearchCuration,
		traversalIDs:          config.TraversalIDs,
		Client:                client,
		hashedRootNamespaceId: config.HashedRootNamespaceId,
		archived:              config.Archived,
		schemaVersionBlob:     config.SchemaVersionBlob,
		schemaVersionCommit:   config.SchemaVersionCommit,
		schemaVersionWiki:     config.SchemaVersionWiki,
	}

	bulk, err := client.BulkProcessor().
		Workers(config.BulkWorkers).
		BulkSize(config.MaxBulkSize).
		After(wrappedClient.afterCallback).
		Do(context.Background())

	if err != nil {
		return nil, err
	}

	wrappedClient.bulk = bulk

	return wrappedClient, nil
}