functionaltests/steps.go (189 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package functionaltests import ( "context" "fmt" "testing" "time" "github.com/stretchr/testify/require" "github.com/elastic/apm-server/functionaltests/internal/asserts" "github.com/elastic/apm-server/functionaltests/internal/ecclient" "github.com/elastic/apm-server/functionaltests/internal/esclient" "github.com/elastic/apm-server/functionaltests/internal/gen" "github.com/elastic/apm-server/functionaltests/internal/kbclient" "github.com/elastic/apm-server/functionaltests/internal/terraform" ) // testStepsRunner consists of composable testStep(s) that is run // in sequence. type testStepsRunner struct { // DataStreamNamespace is the namespace for the APM data streams // that is being tested. Defaults to "default". // // NOTE: Only applicable for stack versions >= 8.0 or 7.x with // integrations enabled. DataStreamNamespace string // Steps are the user defined test steps to be run. Steps []testStep } // Run runs the test steps in sequence, passing the result from the current step // into the next step etc. func (r testStepsRunner) Run(t *testing.T) { if r.DataStreamNamespace == "" { r.DataStreamNamespace = "default" } if len(r.Steps) == 0 { t.Fatal("at least one step must be specified in test run") } if _, ok := r.Steps[0].(createStep); !ok { t.Fatal("first step of test run must be createStep") } start := time.Now() ctx := context.Background() env := testStepEnv{dsNamespace: r.DataStreamNamespace} currentRes := testStepResult{} for _, step := range r.Steps { currentRes = step.Step(t, ctx, &env, currentRes) t.Logf("time elapsed: %s", time.Since(start)) } } // testStepResult contains the results of running the step. type testStepResult struct { // DSDocCount is the data streams document counts that is the // result of this step. // // Note: Only applicable for stack versions >= 8.0. DSDocCount esclient.DataStreamsDocCount // IndicesDocCount is the indices document counts that is the // result of this step. // // Note: Only applicable for stack versions < 8.0. IndicesDocCount esclient.IndicesDocCount } // testStepEnv is the environment of the step that is run. type testStepEnv struct { dsNamespace string versions []ecclient.StackVersion integrations bool tf *terraform.Runner gen *gen.Generator kbc *kbclient.Client esc *esclient.Client } func (env *testStepEnv) currentVersion() ecclient.StackVersion { if len(env.versions) == 0 { panic("test step env current version not found") } return env.versions[len(env.versions)-1] } type testStep interface { Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult } // apmDeploymentMode is the deployment mode of APM in the cluster. // This is used instead of bool to avoid having to use bool pointer // (since the default is true). type apmDeploymentMode uint8 const ( apmDefault apmDeploymentMode = iota apmManaged apmStandalone ) func (mode apmDeploymentMode) enableIntegrations() bool { return mode == apmDefault || mode == apmManaged } // createStep initializes the Terraform runner and deploys an Elastic Cloud // Hosted (ECH) cluster with the provided stack version. It also creates the // necessary clients and set them into testStepEnv. // // The output of this step is the initial document counts in ES. // // Note: This step should always be the first step of any test runs, since it // initializes all the necessary dependencies for subsequent steps. type createStep struct { DeployVersion ecclient.StackVersion APMDeploymentMode apmDeploymentMode } func (c createStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, _ testStepResult) testStepResult { integrations := c.APMDeploymentMode.enableIntegrations() if c.DeployVersion.Major < 8 && integrations { t.Fatal("create step cannot enable integrations for versions < 8.0") } t.Logf("------ cluster setup %s ------", c.DeployVersion) e.tf = initTerraformRunner(t) deployInfo := createCluster(t, ctx, e.tf, *target, c.DeployVersion, integrations) e.esc = createESClient(t, deployInfo) e.kbc = createKibanaClient(t, deployInfo) e.gen = createAPMGenerator(t, ctx, e.esc, e.kbc, deployInfo) // Update the latest environment version to the new one. e.versions = append(e.versions, c.DeployVersion) e.integrations = integrations if e.currentVersion().Major < 8 { return testStepResult{IndicesDocCount: getDocCountPerIndexV7(t, ctx, e.esc)} } return testStepResult{DSDocCount: getDocCountPerDS(t, ctx, e.esc)} } var _ testStep = ingestStep{} // ingestStep performs ingestion to the APM Server deployed on ECH. After // ingestion, it checks if the document counts difference between current // and previous is expected, and if the data streams are in an expected // state. // // The output of this step is the data streams document counts after ingestion. // // NOTE: Only works for versions >= 8.0. type ingestStep struct { CheckDataStream asserts.CheckDataStreamsWant // IgnoreDataStreams are the data streams to be ignored in assertions. // The data stream names can contain '%s' to indicate namespace. IgnoreDataStreams []string // CheckIndividualDataStream is used to check the data streams individually // instead of as a whole using CheckDataStream. // The data stream names can contain '%s' to indicate namespace. CheckIndividualDataStream map[string]asserts.CheckDataStreamIndividualWant } var _ testStep = ingestStep{} func (i ingestStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { if e.currentVersion().Major < 8 { t.Fatal("ingest step should only be used for versions >= 8.0") } t.Logf("------ ingest in %s------", e.currentVersion()) err := e.gen.RunBlockingWait(ctx, e.currentVersion(), e.integrations) require.NoError(t, err) t.Logf("------ ingest check in %s ------", e.currentVersion()) t.Log("check number of documents after ingestion") ignoreDS := formatAll(i.IgnoreDataStreams, e.dsNamespace) dsDocCount := getDocCountPerDS(t, ctx, e.esc, ignoreDS...) asserts.CheckDocCount(t, dsDocCount, previousRes.DSDocCount, expectedDataStreamsIngest(e.dsNamespace)) t.Log("check data streams after ingestion") dataStreams := getAPMDataStreams(t, ctx, e.esc, ignoreDS...) if i.CheckIndividualDataStream != nil { expected := formatAllMap(i.CheckIndividualDataStream, e.dsNamespace) asserts.CheckDataStreamsIndividually(t, expected, dataStreams) } else { asserts.CheckDataStreams(t, i.CheckDataStream, dataStreams) } return testStepResult{DSDocCount: dsDocCount} } func formatAll(formats []string, s string) []string { res := make([]string, 0, len(formats)) for _, format := range formats { res = append(res, fmt.Sprintf(format, s)) } return res } func formatAllMap[T any](m map[string]T, s string) map[string]T { res := make(map[string]T) for k, v := range m { res[fmt.Sprintf(k, s)] = v } return res } // upgradeStep upgrades the ECH deployment from its current version to the new // version. It also adds the new version into testStepEnv. After upgrade, it // checks that the document counts did not change across upgrade. // // The output of this step is the data streams document counts after upgrade. // // NOTE: Only works from versions >= 8.0. type upgradeStep struct { NewVersion ecclient.StackVersion CheckDataStream asserts.CheckDataStreamsWant // IgnoreDataStreams are the data streams to be ignored in assertions. // The data stream names can contain '%s' to indicate namespace. IgnoreDataStreams []string // CheckIndividualDataStream is used to check the data streams individually // instead of as a whole using CheckDataStream. // The data stream names can contain '%s' to indicate namespace. CheckIndividualDataStream map[string]asserts.CheckDataStreamIndividualWant } var _ testStep = upgradeStep{} func (u upgradeStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { if e.currentVersion().Major < 8 { t.Fatal("upgrade step should only be used from versions >= 8.0") } t.Logf("------ upgrade %s to %s ------", e.currentVersion(), u.NewVersion) upgradeCluster(t, ctx, e.tf, *target, u.NewVersion, e.integrations) // Update the environment version to the new one. e.versions = append(e.versions, u.NewVersion) t.Logf("------ upgrade check in %s ------", e.currentVersion()) t.Log("check number of documents across upgrade") // We assert that no changes happened in the number of documents after upgrade // to ensure the state didn't change. // We don't expect any change here unless something broke during the upgrade. ignoreDS := formatAll(u.IgnoreDataStreams, e.dsNamespace) dsDocCount := getDocCountPerDS(t, ctx, e.esc, ignoreDS...) asserts.CheckDocCount(t, dsDocCount, previousRes.DSDocCount, emptyDataStreamsIngest(e.dsNamespace)) t.Log("check data streams after upgrade") dataStreams := getAPMDataStreams(t, ctx, e.esc, ignoreDS...) if u.CheckIndividualDataStream != nil { expected := formatAllMap(u.CheckIndividualDataStream, e.dsNamespace) asserts.CheckDataStreamsIndividually(t, expected, dataStreams) } else { asserts.CheckDataStreams(t, u.CheckDataStream, dataStreams) } return testStepResult{DSDocCount: dsDocCount} } // checkErrorLogsStep checks if there are any unexpected error logs from both // Elasticsearch and APM Server. The provided APMErrorLogsIgnored is used to // ignore some APM logs from being included in the assertion. // // The output of this step is the previous step's result. type checkErrorLogsStep struct { ESErrorLogsIgnored esErrorLogs APMErrorLogsIgnored apmErrorLogs } var _ testStep = checkErrorLogsStep{} func (c checkErrorLogsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { t.Log("------ check ES and APM error logs ------") t.Log("checking ES error logs") resp, err := e.esc.GetESErrorLogs(ctx, c.ESErrorLogsIgnored.ToQueries()...) require.NoError(t, err) asserts.ZeroESLogs(t, *resp) t.Log("checking APM error logs") resp, err = e.esc.GetAPMErrorLogs(ctx, c.APMErrorLogsIgnored.ToQueries()...) require.NoError(t, err) asserts.ZeroAPMLogs(t, *resp) return previousRes } type stepFunc func(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult // customStep is a custom step to be defined by the user's. The step will run // the provided Func. type customStep struct { Func stepFunc } var _ testStep = customStep{} func (c customStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { return c.Func(t, ctx, e, previousRes) }