functionaltests/steps_v7.go (131 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package functionaltests import ( "context" "fmt" "testing" "time" "github.com/stretchr/testify/require" "github.com/elastic/apm-server/functionaltests/internal/asserts" "github.com/elastic/apm-server/functionaltests/internal/ecclient" "github.com/elastic/apm-server/functionaltests/internal/esclient" ) func expectedIndicesIngest() esclient.IndicesDocCount { return esclient.IndicesDocCount{ "apm-*-error-*": 364, "apm-*-profile-*": 0, "apm-*-span-*": 10885, "apm-*-transaction-*": 4128, // Ignore aggregation indices. "apm-*-metric-*": -1, "apm-*-onboarding-*": -1, } } func emptyIndicesIngest() esclient.IndicesDocCount { return esclient.IndicesDocCount{ "apm-*-error-*": 0, "apm-*-profile-*": 0, "apm-*-span-*": 0, "apm-*-transaction-*": 0, "apm-*-onboarding-*": 0, "apm-*-metric-*": -1, } } func expectedDataStreamsIngestV7(namespace string) esclient.DataStreamsDocCount { return map[string]int{ fmt.Sprintf("traces-apm-%s", namespace): 15013, fmt.Sprintf("logs-apm.error-%s", namespace): 364, fmt.Sprintf("metrics-apm.app.opbeans_python-%s", namespace): 1492, fmt.Sprintf("metrics-apm.app.opbeans_node-%s", namespace): 27, fmt.Sprintf("metrics-apm.app.opbeans_go-%s", namespace): 11, fmt.Sprintf("metrics-apm.app.opbeans_ruby-%s", namespace): 24, // Document count fluctuates constantly. fmt.Sprintf("metrics-apm.internal-%s", namespace): -1, } } func emptyDataStreamsIngestV7(namespace string) esclient.DataStreamsDocCount { return map[string]int{ fmt.Sprintf("traces-apm-%s", namespace): 0, fmt.Sprintf("metrics-apm.app.opbeans_python-%s", namespace): 0, fmt.Sprintf("metrics-apm.app.opbeans_node-%s", namespace): 0, fmt.Sprintf("metrics-apm.app.opbeans_go-%s", namespace): 0, fmt.Sprintf("metrics-apm.app.opbeans_ruby-%s", namespace): 0, fmt.Sprintf("metrics-apm.internal-%s", namespace): 0, fmt.Sprintf("logs-apm.error-%s", namespace): 0, } } // ingestV7Step performs ingestion to the APM Server deployed on ECH. // After ingestion, it checks if the document counts difference between // current and previous is expected. // // The output of this step is the indices document counts after ingestion. // // NOTE: Only works for versions 7.x. type ingestV7Step struct{} var _ testStep = ingestV7Step{} func (i ingestV7Step) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { if e.currentVersion().Major >= 8 { t.Fatal("ingest v7 step should only be used for versions < 8.0") } t.Logf("------ ingest in %s ------", e.currentVersion()) err := e.gen.RunBlockingWait(ctx, e.currentVersion(), e.integrations) require.NoError(t, err) t.Logf("------ ingest check in %s ------", e.currentVersion()) t.Log("check number of documents after ingestion") // Standalone, check indices. if !e.integrations { idxDocCount := getDocCountPerIndexV7(t, ctx, e.esc) asserts.CheckDocCountV7(t, idxDocCount, previousRes.IndicesDocCount, expectedIndicesIngest()) return testStepResult{IndicesDocCount: idxDocCount} } // Managed, check data streams dsDocCount := getDocCountPerDSV7(t, ctx, e.esc, e.dsNamespace) asserts.CheckDocCount(t, dsDocCount, previousRes.DSDocCount, expectedDataStreamsIngestV7(e.dsNamespace)) return testStepResult{DSDocCount: dsDocCount} } // upgradeV7Step upgrades the ECH deployment from its current version to // the new version. It also adds the new version into testStepEnv. After // upgrade, it checks that the document counts did not change across upgrade. // // The output of this step is the indices document counts if upgrading to 7.x, // or data streams document counts if upgrading to >= 8.0. // // NOTE: Only works from versions 7.x. type upgradeV7Step struct { NewVersion ecclient.StackVersion } var _ testStep = upgradeV7Step{} func (u upgradeV7Step) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { if e.currentVersion().Major >= 8 { t.Fatal("upgrade v7 step should only be used from versions < 8.0") } t.Logf("------ upgrade %s to %s ------", e.currentVersion(), u.NewVersion) upgradeCluster(t, ctx, e.tf, *target, u.NewVersion, e.integrations) // Update the environment version to the new one. e.versions = append(e.versions, u.NewVersion) t.Logf("------ upgrade check in %s ------", e.currentVersion()) t.Log("check number of documents across upgrade") // We assert that no changes happened in the number of documents after upgrade // to ensure the state didn't change. // We don't expect any change here unless something broke during the upgrade. if !e.integrations { // Standalone, return indices even if upgraded to >= 8.0, since indices // will simply be ignored by 8.x checks. idxDocCount := getDocCountPerIndexV7(t, ctx, e.esc) asserts.CheckDocCountV7(t, idxDocCount, previousRes.IndicesDocCount, emptyIndicesIngest()) return testStepResult{IndicesDocCount: idxDocCount} } // Managed, should be data streams regardless of upgrade. dsDocCount := getDocCountPerDSV7(t, ctx, e.esc, e.dsNamespace) asserts.CheckDocCount(t, dsDocCount, previousRes.DSDocCount, emptyDataStreamsIngestV7(e.dsNamespace)) return testStepResult{DSDocCount: getDocCountPerDS(t, ctx, e.esc)} } // migrateManagedStep migrates the ECH APM deployment from standalone mode to // managed mode, which involves enabling the integrations server via Kibana. // It also checks that the document counts did not change across the migration. // // The output of this step is the indices document counts if version < 8.0, // or data streams document counts if version >= 8.0. type migrateManagedStep struct{} var _ testStep = migrateManagedStep{} func (m migrateManagedStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { if e.integrations { t.Fatal("migrate managed step should only be used on standalone") } t.Logf("------ migrate to managed for %s ------", e.currentVersion()) t.Log("enable integrations server") err := e.kbc.EnableIntegrationsServer(ctx) require.NoError(t, err) e.integrations = true // APM Server needs some time to start serving requests again, and we don't have any // visibility on when this completes. // NOTE: This value comes from empirical observations. time.Sleep(80 * time.Second) t.Log("check number of documents across migration to managed") // We assert that no changes happened in the number of documents after migration // to ensure the state didn't change. // We don't expect any change here unless something broke during the migration. if e.currentVersion().Major < 8 { idxDocCount := getDocCountPerIndexV7(t, ctx, e.esc) asserts.CheckDocCountV7(t, idxDocCount, previousRes.IndicesDocCount, emptyIndicesIngest()) return testStepResult{IndicesDocCount: idxDocCount} } dsDocCount := getDocCountPerDS(t, ctx, e.esc) asserts.CheckDocCount(t, dsDocCount, previousRes.DSDocCount, emptyDataStreamsIngest(e.dsNamespace)) return testStepResult{DSDocCount: dsDocCount} } // resolveDeprecationsStep resolves critical migration deprecation warnings from Elasticsearch regarding // indices created in 7.x not being compatible with 9.x. // // The output of this step is the previous test step result. type resolveDeprecationsStep struct{} var _ testStep = resolveDeprecationsStep{} func (r resolveDeprecationsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv, previousRes testStepResult) testStepResult { t.Logf("------ resolve migration deprecations in %s ------", e.currentVersion()) err := e.kbc.ResolveMigrationDeprecations(ctx) require.NoError(t, err) return previousRes }