systemtest/elasticsearch.go (125 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package systemtest
import (
"context"
"encoding/json"
"io"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/elastic/apm-tools/pkg/espoll"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/elastic/apm-server/systemtest/apmservertest"
)
const (
adminElasticsearchUser = "admin"
adminElasticsearchPass = "changeme"
maxElasticsearchBackoff = 10 * time.Second
)
var (
// Elasticsearch is an Elasticsearch client for use in tests.
Elasticsearch *espoll.Client
)
func initElasticSearch() {
cfg := newElasticsearchConfig()
cfg.Username = adminElasticsearchUser
cfg.Password = adminElasticsearchPass
client, err := elasticsearch.NewClient(cfg)
if err != nil {
panic(err)
}
Elasticsearch = &espoll.Client{Client: client}
}
func newElasticsearchConfig() elasticsearch.Config {
var addresses []string
for _, host := range apmservertest.DefaultConfig().Output.Elasticsearch.Hosts {
u := url.URL{Scheme: "http", Host: host}
addresses = append(addresses, u.String())
}
return elasticsearch.Config{
Addresses: addresses,
MaxRetries: 5,
RetryBackoff: func(attempt int) time.Duration {
backoff := (500 * time.Millisecond) * (1 << (attempt - 1))
if backoff > maxElasticsearchBackoff {
backoff = maxElasticsearchBackoff
}
return backoff
},
}
}
// CleanupElasticsearch deletes all data streams created by APM Server.
func CleanupElasticsearch(t testing.TB) {
err := cleanupElasticsearch()
require.NoError(t, err)
}
func cleanupElasticsearch() error {
_, err := Elasticsearch.Do(context.Background(), &esapi.IndicesDeleteDataStreamRequest{
Name: []string{
"traces-apm*",
"metrics-apm*",
"logs-apm*",
},
ExpandWildcards: "all",
}, nil)
return err
}
// ChangeUserPassword changes the password for a given user.
func ChangeUserPassword(t testing.TB, username, password string) {
req := esapi.SecurityChangePasswordRequest{
Username: username,
Body: esutil.NewJSONReader(map[string]interface{}{"password": password}),
}
if _, err := Elasticsearch.Do(context.Background(), req, nil); err != nil {
t.Fatal(err)
}
}
func CreateAPIKey(t testing.TB, name string, privileges []string) string {
req := esapi.SecurityCreateAPIKeyRequest{
Body: esutil.NewJSONReader(map[string]any{
"name": name,
"role_descriptors": map[string]any{
"apm": map[string]any{
"applications": []map[string]any{
{
"application": "apm",
"privileges": privileges,
"resources": []string{"*"},
},
},
},
},
"metadata": map[string]any{"application": "apm"},
}),
}
res, err := Elasticsearch.Do(context.Background(), req, nil)
if err != nil {
t.Fatal(err)
}
b, err := io.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
m := make(map[string]any)
if err := json.Unmarshal(b, &m); err != nil {
t.Fatal(err)
}
return m["encoded"].(string)
}
// InvalidateAPIKeys invalidates all API Keys for the apm-server user.
func InvalidateAPIKeys(t testing.TB) {
req := esapi.SecurityInvalidateAPIKeyRequest{
Body: esutil.NewJSONReader(map[string]interface{}{
"username": apmservertest.DefaultConfig().Output.Elasticsearch.Username,
}),
}
if _, err := Elasticsearch.Do(context.Background(), req, nil); err != nil {
t.Fatal(err)
}
}
// InvalidateAPIKeyByName invalidates the API Key with the given name.
func InvalidateAPIKeyByName(t testing.TB, name string) {
req := esapi.SecurityInvalidateAPIKeyRequest{
Body: esutil.NewJSONReader(map[string]interface{}{"name": name}),
}
if _, err := Elasticsearch.Do(context.Background(), req, nil); err != nil {
t.Fatal(err)
}
}