functionaltests/utils.go (303 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package functionaltests import ( "context" "flag" "fmt" "maps" "os" "slices" "strconv" "strings" "testing" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/elastic/go-elasticsearch/v8/typedapi/types" "github.com/elastic/apm-server/functionaltests/internal/ecclient" "github.com/elastic/apm-server/functionaltests/internal/esclient" "github.com/elastic/apm-server/functionaltests/internal/gen" "github.com/elastic/apm-server/functionaltests/internal/kbclient" "github.com/elastic/apm-server/functionaltests/internal/terraform" ) var ( // cleanupOnFailure determines whether the created resources should be cleaned up on test failure. cleanupOnFailure = flag.Bool( "cleanup-on-failure", true, "Whether to run cleanup even if the test failed.", ) // target is the Elastic Cloud environment to target with these test. // We use 'pro' for production as that is the key used to retrieve EC_API_KEY from secret storage. target = flag.String( "target", "pro", "The target environment where to run tests againts. Valid values are: qa, pro.", ) ) const ( // managedByDSL is the constant string used by Elasticsearch to specify that an Index is managed by Data Stream Lifecycle management. managedByDSL = "Data stream lifecycle" // managedByILM is the constant string used by Elasticsearch to specify that an Index is managed by Index Lifecycle Management. managedByILM = "Index Lifecycle Management" ) var ( // fetchedCandidates are the build-candidate stack versions prefetched from Elastic Cloud API. fetchedCandidates ecclient.StackVersionInfos // fetchedSnapshots are the snapshot stack versions prefetched from Elastic Cloud API. fetchedSnapshots ecclient.StackVersionInfos // fetchedVersions are the non-snapshot stack versions prefetched from Elastic Cloud API. fetchedVersions ecclient.StackVersionInfos ) // getLatestVersionOrSkip retrieves the latest non-snapshot version for the version prefix. // If the version is not found, the test is skipped via t.Skip. func getLatestVersionOrSkip(t *testing.T, prefix string) ecclient.StackVersionInfo { t.Helper() version, ok := fetchedVersions.LatestFor(prefix) if !ok { t.Skipf("version for '%s' not found in EC region %s, skipping test", prefix, regionFrom(*target)) return ecclient.StackVersionInfo{} } return version } // getLatestBCOrSkip retrieves the latest build-candidate version for the version prefix. // If the version is not found, the test is skipped via t.Skip. func getLatestBCOrSkip(t *testing.T, prefix string) ecclient.StackVersionInfo { t.Helper() candidate, ok := fetchedCandidates.LatestFor(prefix) if !ok { t.Skipf("BC for '%s' not found in EC region %s, skipping test", prefix, regionFrom(*target)) return ecclient.StackVersionInfo{} } // Check that the BC version is actually latest, otherwise skip test. versionInfo := getLatestVersionOrSkip(t, prefix) if versionInfo.Version.Major != candidate.Version.Major { t.Skipf("BC for '%s' is invalid in EC region %s, skipping test", prefix, regionFrom(*target)) return ecclient.StackVersionInfo{} } if versionInfo.Version.Minor > candidate.Version.Minor { t.Skipf("BC for '%s' is less than latest normal version in EC region %s, skipping test", prefix, regionFrom(*target)) return ecclient.StackVersionInfo{} } return candidate } // getLatestSnapshot retrieves the latest snapshot version for the version prefix. func getLatestSnapshot(t *testing.T, prefix string) ecclient.StackVersionInfo { t.Helper() version, ok := fetchedSnapshots.LatestFor(prefix) require.True(t, ok, "snapshot for '%s' found in EC region %s", prefix, regionFrom(*target)) return version } // expectedDataStreamsIngest represent the expected number of ingested document // after a single run of ingest. // // NOTE: The aggregation data streams have negative counts, because they are // expected to appear but the document counts should not be asserted. func expectedDataStreamsIngest(namespace string) esclient.DataStreamsDocCount { return map[string]int{ fmt.Sprintf("traces-apm-%s", namespace): 15013, fmt.Sprintf("metrics-apm.app.opbeans_python-%s", namespace): 1437, fmt.Sprintf("metrics-apm.internal-%s", namespace): 1351, fmt.Sprintf("logs-apm.error-%s", namespace): 364, // Ignore aggregation data streams. fmt.Sprintf("metrics-apm.service_destination.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.service_transaction.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.service_summary.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.transaction.1m-%s", namespace): -1, } } // emptyDataStreamsIngest represent an empty ingestion. // It is useful for asserting that the document count did not change after an operation. // // NOTE: The aggregation data streams have negative counts, because they // are expected to appear but the document counts should not be asserted. func emptyDataStreamsIngest(namespace string) esclient.DataStreamsDocCount { return map[string]int{ fmt.Sprintf("traces-apm-%s", namespace): 0, fmt.Sprintf("metrics-apm.app.opbeans_python-%s", namespace): 0, fmt.Sprintf("metrics-apm.internal-%s", namespace): 0, fmt.Sprintf("logs-apm.error-%s", namespace): 0, // Ignore aggregation data streams. fmt.Sprintf("metrics-apm.service_destination.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.service_transaction.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.service_summary.1m-%s", namespace): -1, fmt.Sprintf("metrics-apm.transaction.1m-%s", namespace): -1, } } func allDataStreams(namespace string) []string { return slices.Collect(maps.Keys(expectedDataStreamsIngest(namespace))) } const ( targetQA = "qa" // we use 'pro' because is the target passed by the Buildkite pipeline running // these tests. targetProd = "pro" ) // regionFrom returns the appropriate region to run test // against based on specified target. // https://www.elastic.co/guide/en/cloud/current/ec-regions-templates-instances.html func regionFrom(target string) string { switch target { case targetQA: return "aws-eu-west-1" case targetProd: return "gcp-us-west2" default: panic("target value is not accepted") } } func endpointFrom(target string) string { switch target { case targetQA: return "https://public-api.qa.cld.elstc.co" case targetProd: return "https://api.elastic-cloud.com" default: panic("target value is not accepted") } } func deploymentTemplateFrom(region string) string { switch region { case "aws-eu-west-1": return "aws-storage-optimized" case "gcp-us-west2": return "gcp-storage-optimized" default: panic("region value is not accepted") } } func formattedTestName(t *testing.T) string { return strings.ReplaceAll(t.Name(), "/", "_") } // terraformDir returns the name of the Terraform files directory for this test. func terraformDir(t *testing.T) string { t.Helper() // Flatten the dir name in case of path separators return fmt.Sprintf("tf-%s", formattedTestName(t)) } // initTerraformRunner copies the static Terraform files to the Terraform directory for this test, // then initializes the Terraform runner in that directory. // // Note: This function will remove all existing files from the test Terraform directory if it exists, // before copying into it. func initTerraformRunner(t *testing.T) *terraform.Runner { t.Helper() dirName := terraformDir(t) err := os.RemoveAll(dirName) require.NoError(t, err) err = os.CopyFS(terraformDir(t), os.DirFS("infra/terraform")) require.NoError(t, err) tf, err := terraform.New(t, dirName) require.NoError(t, err) return tf } type deploymentInfo struct { // ElasticsearchURL holds the Elasticsearch URL. ElasticsearchURL string // Username holds the Elasticsearch superuser username for basic auth. Username string // Password holds the Elasticsearch superuser password for basic auth. Password string // APMServerURL holds the APM Server URL. APMServerURL string // KibanaURL holds the Kibana URL. KibanaURL string } // createCluster runs terraform on the test terraform folder to spin up an Elastic Cloud Hosted cluster for testing. // It returns the deploymentID of the created cluster and an esclient.Config object filled with cluster relevant // information. // It sets up a cleanup function to destroy resources if the test succeed, leveraging the cleanupOnFailure flag to // skip this behavior if appropriate. func createCluster( t *testing.T, ctx context.Context, tf *terraform.Runner, target string, fromVersion ecclient.StackVersion, enableIntegrations bool, ) deploymentInfo { t.Helper() t.Logf("creating deployment version %s", fromVersion) // TODO: use a terraform var file for all vars that are not expected to change across upgrades // to simplify and clarify this code. ecTarget := terraform.Var("ec_target", target) ecRegion := terraform.Var("ec_region", regionFrom(target)) ecDeploymentTpl := terraform.Var("ec_deployment_template", deploymentTemplateFrom(regionFrom(target))) version := terraform.Var("stack_version", fromVersion.String()) integrations := terraform.Var("integrations_server", strconv.FormatBool(enableIntegrations)) name := terraform.Var("name", formattedTestName(t)) require.NoError(t, tf.Apply(ctx, ecTarget, ecRegion, ecDeploymentTpl, version, integrations, name)) t.Cleanup(func() { if !t.Failed() || (t.Failed() && *cleanupOnFailure) { t.Log("cleanup terraform resources") require.NoError(t, tf.Destroy(ctx, ecTarget, ecRegion, ecDeploymentTpl, name, version)) } else { t.Log("test failed and cleanup-on-failure is false, skipping cleanup") } }) var deploymentID string require.NoError(t, tf.Output("deployment_id", &deploymentID)) var apmID string require.NoError(t, tf.Output("apm_id", &apmID)) var info deploymentInfo require.NoError(t, tf.Output("apm_url", &info.APMServerURL)) require.NoError(t, tf.Output("es_url", &info.ElasticsearchURL)) require.NoError(t, tf.Output("username", &info.Username)) require.NoError(t, tf.Output("password", &info.Password)) require.NoError(t, tf.Output("kb_url", &info.KibanaURL)) standaloneOrManaged := "standalone" if enableIntegrations { standaloneOrManaged = "managed" } t.Logf("created deployment %s with %s APM (%s)", deploymentID, standaloneOrManaged, apmID) return info } // upgradeCluster applies the terraform configuration from the test terraform folder. func upgradeCluster( t *testing.T, ctx context.Context, tf *terraform.Runner, target string, toVersion ecclient.StackVersion, enableIntegrations bool, ) { t.Helper() t.Logf("upgrade deployment to %s", toVersion) ecTarget := terraform.Var("ec_target", target) ecRegion := terraform.Var("ec_region", regionFrom(target)) ecDeploymentTpl := terraform.Var("ec_deployment_template", deploymentTemplateFrom(regionFrom(target))) version := terraform.Var("stack_version", toVersion.String()) integrations := terraform.Var("integrations_server", strconv.FormatBool(enableIntegrations)) name := terraform.Var("name", formattedTestName(t)) require.NoError(t, tf.Apply(ctx, ecTarget, ecRegion, ecDeploymentTpl, version, integrations, name)) } // createESClient instantiate an HTTP API client with dedicated methods to query the Elasticsearch API. func createESClient(t *testing.T, deployInfo deploymentInfo) *esclient.Client { t.Helper() t.Log("create elasticsearch client") esc, err := esclient.New(deployInfo.ElasticsearchURL, deployInfo.Username, deployInfo.Password) require.NoError(t, err) return esc } // createKibanaClient instantiate an HTTP API client with dedicated methods to query the Kibana API. func createKibanaClient(t *testing.T, deployInfo deploymentInfo) *kbclient.Client { t.Helper() t.Log("create kibana client") kbc, err := kbclient.New(deployInfo.KibanaURL, deployInfo.Username, deployInfo.Password) require.NoError(t, err) return kbc } // createAPMGenerator instantiate a load generator for APM. // This function will also create an Elasticsearch API key with full permissions to be used by the generator. func createAPMGenerator(t *testing.T, ctx context.Context, esc *esclient.Client, kbc *kbclient.Client, deployInfo deploymentInfo) *gen.Generator { t.Helper() t.Log("create apm generator") apiKey, err := esc.CreateAPIKey(ctx, "apmgenerator", -1, map[string]types.RoleDescriptor{}) require.NoError(t, err) logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) g := gen.New(deployInfo.APMServerURL, apiKey, kbc, logger) return g } func sliceToSet[T comparable](s []T) map[T]bool { m := make(map[T]bool) for _, ele := range s { m[ele] = true } return m } // getAPMDataStreams get all APM related data streams. func getAPMDataStreams(t *testing.T, ctx context.Context, esc *esclient.Client, ignoreDS ...string) []types.DataStream { t.Helper() dataStreams, err := esc.GetDataStream(ctx, "*apm*") require.NoError(t, err) ignore := sliceToSet(ignoreDS) return slices.DeleteFunc(dataStreams, func(ds types.DataStream) bool { return ignore[ds.Name] }) } // getDocCountPerDS retrieves document count per data stream for versions >= 8.0. func getDocCountPerDS(t *testing.T, ctx context.Context, esc *esclient.Client, ignoreDS ...string) esclient.DataStreamsDocCount { t.Helper() count, err := esc.APMDSDocCount(ctx) require.NoError(t, err) ignore := sliceToSet(ignoreDS) maps.DeleteFunc(count, func(ds string, _ int) bool { return ignore[ds] }) return count } // getDocCountPerDS retrieves document count per data stream for versions < 8.0. func getDocCountPerDSV7(t *testing.T, ctx context.Context, esc *esclient.Client, namespace string) esclient.DataStreamsDocCount { t.Helper() count, err := esc.APMDSDocCountV7(ctx, namespace) require.NoError(t, err) return count } // getDocCountPerIndexV7 retrieves document count per index for versions < 8.0. func getDocCountPerIndexV7(t *testing.T, ctx context.Context, esc *esclient.Client) esclient.IndicesDocCount { t.Helper() count, err := esc.APMIdxDocCountV7(ctx) require.NoError(t, err) return count } // createRerouteIngestPipeline creates custom pipelines to reroute logs, metrics and traces to different // data streams specified by namespace. func createRerouteIngestPipeline(t *testing.T, ctx context.Context, esc *esclient.Client, namespace string) { t.Helper() for _, pipeline := range []string{"logs@custom", "metrics@custom", "traces@custom"} { err := esc.CreateIngestPipeline(ctx, pipeline, []types.ProcessorContainer{ { Reroute: &types.RerouteProcessor{ Namespace: []string{namespace}, }, }, }) require.NoError(t, err) } } // performManualRollovers rollover all logs, metrics and traces data streams to new indices. func performManualRollovers(t *testing.T, ctx context.Context, esc *esclient.Client, namespace string) { t.Helper() for _, ds := range allDataStreams(namespace) { err := esc.PerformManualRollover(ctx, ds) require.NoError(t, err) } }