internal/pkg/testing/setup.go (112 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. //go:build integration package testing import ( "bytes" "context" "encoding/json" "errors" "testing" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-ucfg/yaml" "github.com/rs/xid" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" ) var defaultCfg config.Config var defaultCfgData = []byte(` output: elasticsearch: hosts: '${ELASTICSEARCH_HOSTS:localhost:9200}' service_token: '${ELASTICSEARCH_SERVICE_TOKEN:test-token}' fleet: agent: id: 1e4954ce-af37-4731-9f4a-407b08e69e42 `) func init() { c, err := yaml.NewConfig(defaultCfgData, config.DefaultOptions...) if err != nil { panic(err) } err = c.Unpack(&defaultCfg, config.DefaultOptions...) if err != nil { panic(err) } } func SetupES(ctx context.Context, t *testing.T) *elasticsearch.Client { t.Helper() cli, err := es.NewClient(ctx, &defaultCfg, false) if err != nil { t.Fatalf("Unable to create elasticsearch client: %v", err) } return cli } func SetupBulk(ctx context.Context, t *testing.T, opts ...bulk.BulkOpt) bulk.Bulk { t.Helper() cli := SetupES(ctx, t) opts = append(opts, bulk.BulkOptsFromCfg(&defaultCfg)...) bulker := bulk.NewBulker(cli, nil, opts...) go func() { _ = bulker.Run(ctx) }() return bulker } func SetupIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, mapping string) string { t.Helper() index := xid.New().String() err := esutil.EnsureIndex(ctx, bulker.Client(), index, mapping) if err != nil { t.Fatal(err) } return index } func SetupIndexWithBulk(ctx context.Context, t *testing.T, mapping string, opts ...bulk.BulkOpt) (string, bulk.Bulk) { t.Helper() bulker := SetupBulk(ctx, t, opts...) index := SetupIndex(ctx, t, bulker, mapping) return index, bulker } func SetupCleanIndex(ctx context.Context, t *testing.T, index string, opts ...bulk.BulkOpt) (string, bulk.Bulk) { bulker := SetupBulk(ctx, t, opts...) CleanIndex(ctx, t, bulker, index) return index, bulker } func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string) string { t.Helper() tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Query().MatchAll() q := tmpl.MustResolve(root) query, err := q.Render(make(map[string]interface{})) if err != nil { t.Fatalf("could not clean index: failed to render query template: %v", err) } cli := bulker.Client() res, err := cli.API.DeleteByQuery([]string{index}, bytes.NewReader(query), cli.API.DeleteByQuery.WithContext(ctx), cli.API.DeleteByQuery.WithRefresh(true), ) if err != nil { t.Fatalf("could not clean index %s, DeleteByQuery failed: %v", index, err) } defer res.Body.Close() var esres es.DeleteByQueryResponse err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { t.Fatalf("could not decode ES response: %v", err) } if res.IsError() { err = es.TranslateError(res.StatusCode, esres.Error) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { err = nil } } } if err != nil { t.Fatalf("ES returned an error: %v. body: %q", err, res) } return index }