internal/testrunner/runners/pipeline/tester.go (473 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package pipeline
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"slices"
"strings"
"time"
"github.com/Masterminds/semver/v3"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-package/internal/common"
"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
"github.com/elastic/elastic-package/internal/environment"
"github.com/elastic/elastic-package/internal/fields"
"github.com/elastic/elastic-package/internal/formatter"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/multierror"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/stack"
"github.com/elastic/elastic-package/internal/testrunner"
)
var serverlessDisableCompareResults = environment.WithElasticPackagePrefix("SERVERLESS_PIPELINE_TEST_DISABLE_COMPARE_RESULTS")
type tester struct {
profile *profile.Profile
deferCleanup time.Duration
esAPI *elasticsearch.API
packageRootPath string
testFolder testrunner.TestFolder
generateTestResult bool
withCoverage bool
coverageType string
globalTestConfig testrunner.GlobalRunnerTestConfig
testCaseFile string
pipelines []ingest.Pipeline
runCompareResults bool
provider stack.Provider
}
type PipelineTesterOptions struct {
Profile *profile.Profile
DeferCleanup time.Duration
API *elasticsearch.API
PackageRootPath string
TestFolder testrunner.TestFolder
GenerateTestResult bool
WithCoverage bool
CoverageType string
TestCaseFile string
GlobalTestConfig testrunner.GlobalRunnerTestConfig
}
func NewPipelineTester(options PipelineTesterOptions) (*tester, error) {
r := tester{
profile: options.Profile,
packageRootPath: options.PackageRootPath,
esAPI: options.API,
deferCleanup: options.DeferCleanup,
testFolder: options.TestFolder,
testCaseFile: options.TestCaseFile,
generateTestResult: options.GenerateTestResult,
withCoverage: options.WithCoverage,
coverageType: options.CoverageType,
globalTestConfig: options.GlobalTestConfig,
}
stackConfig, err := stack.LoadConfig(r.profile)
if err != nil {
return nil, err
}
provider, err := stack.BuildProvider(stackConfig.Provider, r.profile)
if err != nil {
return nil, fmt.Errorf("failed to build stack provider: %w", err)
}
r.provider = provider
r.runCompareResults = true
if stackConfig.Provider == stack.ProviderServerless {
r.runCompareResults = true
v, ok := os.LookupEnv(serverlessDisableCompareResults)
if ok && strings.ToLower(v) == "true" {
r.runCompareResults = false
}
}
if r.esAPI == nil {
return nil, errors.New("missing Elasticsearch client")
}
return &r, nil
}
type IngestPipelineReroute struct {
Description string `yaml:"description"`
Processors []map[string]ingest.RerouteProcessor `yaml:"processors"`
AdditionalFields map[string]interface{} `yaml:",inline"`
}
// Ensures that runner implements testrunner.Tester interface
var _ testrunner.Tester = new(tester)
// Type returns the type of test that can be run by this test runner.
func (r *tester) Type() testrunner.TestType {
return TestType
}
// String returns the human-friendly name of the test runner.
func (r *tester) String() string {
return "pipeline"
}
// Parallel indicates if this tester can run in parallel or not.
func (r tester) Parallel() bool {
// Not supported yet parallel tests even if it is indicated in the global config r.globalTestConfig
return false
}
// Run runs the pipeline tests defined under the given folder
func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) {
return r.run(ctx)
}
// TearDown shuts down the pipeline test runner.
func (r *tester) TearDown(ctx context.Context) error {
if r.deferCleanup > 0 {
logger.Debugf("Waiting for %s before cleanup...", r.deferCleanup)
select {
case <-time.After(r.deferCleanup):
case <-ctx.Done():
}
}
if err := ingest.UninstallPipelines(ctx, r.esAPI, r.pipelines); err != nil {
return fmt.Errorf("uninstalling ingest pipelines failed: %w", err)
}
return nil
}
func (r *tester) run(ctx context.Context) ([]testrunner.TestResult, error) {
dataStreamPath, found, err := packages.FindDataStreamRootForPath(r.testFolder.Path)
if err != nil {
return nil, fmt.Errorf("locating data_stream root failed: %w", err)
}
if !found {
return nil, errors.New("data stream root not found")
}
startTesting := time.Now()
var entryPipeline string
entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.esAPI, dataStreamPath)
if err != nil {
return nil, fmt.Errorf("installing ingest pipelines failed: %w", err)
}
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.packageRootPath)
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %w", err)
}
dsManifest, err := packages.ReadDataStreamManifestFromPackageRoot(r.packageRootPath, r.testFolder.DataStream)
if err != nil {
return nil, fmt.Errorf("failed to read data stream manifest: %w", err)
}
// when reroute processors are used, expectedDatasets should be set depends on the processor config
var expectedDatasets []string
for _, pipeline := range r.pipelines {
var esIngestPipeline IngestPipelineReroute
err = yaml.Unmarshal(pipeline.Content, &esIngestPipeline)
if err != nil {
return nil, fmt.Errorf("unmarshalling ingest pipeline content failed: %w", err)
}
for _, processor := range esIngestPipeline.Processors {
if reroute, ok := processor["reroute"]; ok {
expectedDatasets = append(expectedDatasets, reroute.Dataset...)
}
}
}
if len(expectedDatasets) == 0 {
expectedDataset := dsManifest.Dataset
if expectedDataset == "" {
expectedDataset = pkgManifest.Name + "." + r.testFolder.DataStream
}
expectedDatasets = []string{expectedDataset}
}
results := make([]testrunner.TestResult, 0)
validatorOptions := []fields.ValidatorOption{
fields.WithSpecVersion(pkgManifest.SpecVersion),
// explicitly enabled for pipeline tests only
// since system tests can have dynamic public IPs
fields.WithEnabledAllowedIPCheck(),
fields.WithExpectedDatasets(expectedDatasets),
fields.WithEnabledImportAllECSSChema(true),
}
result, err := r.runTestCase(ctx, r.testCaseFile, dataStreamPath, dsManifest.Type, entryPipeline, validatorOptions)
if err != nil {
return nil, err
}
results = append(results, result...)
esLogs, err := r.checkElasticsearchLogs(ctx, startTesting)
if err != nil {
return nil, err
}
results = append(results, esLogs...)
return results, nil
}
func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.Time) ([]testrunner.TestResult, error) {
startTime := time.Now()
testingTime := startTesting.Truncate(time.Second)
statusOptions := stack.Options{
Profile: r.profile,
}
_, err := r.provider.Status(ctx, statusOptions)
if err != nil {
logger.Debugf("Not checking Elasticsearch logs: %s", err)
return nil, nil
}
dumpOptions := stack.DumpOptions{
Profile: r.profile,
Services: []string{"elasticsearch"},
Since: testingTime,
}
dump, err := r.provider.Dump(ctx, dumpOptions)
var notImplementedErr *stack.ErrNotImplemented
if errors.As(err, ¬ImplementedErr) || errors.Is(err, stack.ErrUnavailableStack) {
logger.Debugf("Not checking Elasticsearch logs: %s", err)
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error at getting the logs of elasticsearch: %w", err)
}
if len(dump) != 1 || dump[0].ServiceName != "elasticsearch" {
return nil, errors.New("expected elasticsearch logs in dump")
}
elasticsearchLogs := dump[0].Logs
seenWarnings := make(map[string]any)
var processorRelatedWarnings []string
err = stack.ParseLogsFromReader(bytes.NewReader(elasticsearchLogs), stack.ParseLogsOptions{
StartTime: testingTime,
}, func(log stack.LogLine) error {
if log.LogLevel != "WARN" {
return nil
}
if _, exists := seenWarnings[log.Message]; exists {
return nil
}
seenWarnings[log.Message] = struct{}{}
logger.Warnf("elasticsearch warning: %s", log.Message)
// trying to catch warnings only related to processors but this is best-effort
if strings.Contains(strings.ToLower(log.Logger), "processor") {
processorRelatedWarnings = append(processorRelatedWarnings, log.Message)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err)
}
tr := testrunner.TestResult{
TestType: TestType,
Name: fmt.Sprintf("(ingest pipeline warnings %s)", r.testCaseFile),
Package: r.testFolder.Package,
DataStream: r.testFolder.DataStream,
TimeElapsed: time.Since(startTime),
}
if totalProcessorWarnings := len(processorRelatedWarnings); totalProcessorWarnings > 0 {
tr.FailureMsg = fmt.Sprintf("detected ingest pipeline warnings: %d", totalProcessorWarnings)
tr.FailureDetails = strings.Join(processorRelatedWarnings, "\n")
}
return []testrunner.TestResult{tr}, nil
}
func (r *tester) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) ([]testrunner.TestResult, error) {
rc := testrunner.NewResultComposer(testrunner.TestResult{
TestType: TestType,
Package: r.testFolder.Package,
DataStream: r.testFolder.DataStream,
})
startTime := time.Now()
tc, err := loadTestCaseFile(r.testFolder.Path, testCaseFile)
if err != nil {
results, _ := rc.WithErrorf("loading test case failed: %w", err)
return results, nil
}
rc.Name = tc.name
if skip := testrunner.AnySkipConfig(tc.config.Skip, r.globalTestConfig.Skip); skip != nil {
logger.Warnf("skipping %s test for %s/%s: %s (details: %s)",
TestType, r.testFolder.Package, r.testFolder.DataStream,
skip.Reason, skip.Link)
results, _ := rc.WithSkip(skip)
return results, nil
}
simulateDataStream := dsType + "-" + r.testFolder.Package + "." + r.testFolder.DataStream + "-default"
processedEvents, err := ingest.SimulatePipeline(ctx, r.esAPI, pipeline, tc.events, simulateDataStream)
if err != nil {
results, _ := rc.WithErrorf("simulating pipeline processing failed: %w", err)
return results, nil
}
result := &testResult{events: processedEvents}
rc.TimeElapsed = time.Since(startTime)
validatorOptions = append(slices.Clone(validatorOptions),
fields.WithNumericKeywordFields(tc.config.NumericKeywordFields),
fields.WithStringNumberFields(tc.config.StringNumberFields),
)
fieldsValidator, err := fields.CreateValidatorForDirectory(dsPath, validatorOptions...)
if err != nil {
return rc.WithErrorf("creating fields validator for data stream failed (path: %s, test case file: %s): %w", dsPath, testCaseFile, err)
}
err = r.verifyResults(testCaseFile, tc.config, result, fieldsValidator)
if err != nil {
results, _ := rc.WithErrorf("verifying test result failed: %w", err)
return results, nil
}
if r.withCoverage {
options := PipelineTesterOptions{
TestFolder: r.testFolder,
API: r.esAPI,
PackageRootPath: r.packageRootPath,
CoverageType: r.coverageType,
}
rc.Coverage, err = getPipelineCoverage(rc.CoveragePackageName(), options, r.pipelines)
if err != nil {
return rc.WithErrorf("error calculating pipeline coverage: %w", err)
}
}
return rc.WithSuccess()
}
func loadTestCaseFile(testFolderPath, testCaseFile string) (*testCase, error) {
testCasePath := filepath.Join(testFolderPath, testCaseFile)
testCaseData, err := os.ReadFile(testCasePath)
if err != nil {
return nil, fmt.Errorf("reading input file failed (testCasePath: %s): %w", testCasePath, err)
}
config, err := readConfigForTestCase(testCasePath)
if err != nil {
return nil, fmt.Errorf("reading config for test case failed (testCasePath: %s): %w", testCasePath, err)
}
if config.Skip != nil {
return &testCase{
name: testCaseFile,
config: config,
}, nil
}
ext := filepath.Ext(testCaseFile)
var entries []json.RawMessage
switch ext {
case ".json":
entries, err = readTestCaseEntriesForEvents(testCaseData)
if err != nil {
return nil, fmt.Errorf("reading test case entries for events failed (testCasePath: %s): %w", testCasePath, err)
}
case ".log":
entries, err = readTestCaseEntriesForRawInput(testCaseData, config)
if err != nil {
return nil, fmt.Errorf("creating test case entries for raw input failed (testCasePath: %s): %w", testCasePath, err)
}
default:
return nil, fmt.Errorf("unsupported extension for test case file (ext: %s)", ext)
}
tc, err := createTestCase(testCaseFile, entries, config)
if err != nil {
return nil, fmt.Errorf("can't create test case (testCasePath: %s): %w", testCasePath, err)
}
return tc, nil
}
func (r *tester) verifyResults(testCaseFile string, config *testConfig, result *testResult, fieldsValidator *fields.Validator) error {
testCasePath := filepath.Join(r.testFolder.Path, testCaseFile)
manifest, err := packages.ReadPackageManifestFromPackageRoot(r.packageRootPath)
if err != nil {
return fmt.Errorf("failed to read package manifest: %w", err)
}
specVersion, err := semver.NewVersion(manifest.SpecVersion)
if err != nil {
return fmt.Errorf("failed to parse package format version %q: %w", manifest.SpecVersion, err)
}
if r.generateTestResult {
err := writeTestResult(testCasePath, result, *specVersion)
if err != nil {
return fmt.Errorf("writing test result failed: %w", err)
}
}
// TODO: temporary workaround until other approach for deterministic geoip in serverless can be implemented.
if r.runCompareResults {
err = compareResults(testCasePath, config, result, *specVersion)
if _, ok := err.(testrunner.ErrTestCaseFailed); ok {
return err
}
if err != nil {
return fmt.Errorf("comparing test results failed: %w", err)
}
}
result = stripEmptyTestResults(result)
err = verifyDynamicFields(result, config)
if err != nil {
return err
}
err = verifyFieldsInTestResult(result, fieldsValidator)
if err != nil {
return err
}
return nil
}
// stripEmptyTestResults function removes events which are nils. These nils can represent
// documents processed by a pipeline which potentially used a "drop" processor (to drop the event at all).
func stripEmptyTestResults(result *testResult) *testResult {
var tr testResult
for _, event := range result.events {
if event == nil {
continue
}
tr.events = append(tr.events, event)
}
return &tr
}
func verifyDynamicFields(result *testResult, config *testConfig) error {
if config == nil || config.DynamicFields == nil {
return nil
}
var multiErr multierror.Error
for _, event := range result.events {
var m common.MapStr
err := formatter.JSONUnmarshalUsingNumber(event, &m)
if err != nil {
return fmt.Errorf("can't unmarshal event: %w", err)
}
for key, pattern := range config.DynamicFields {
val, err := m.GetValue(key)
if err != nil && err != common.ErrKeyNotFound {
return fmt.Errorf("can't remove dynamic field: %w", err)
}
valStr, ok := val.(string)
if !ok {
continue // regular expressions can be verify only string values
}
matched, err := regexp.MatchString(pattern, valStr)
if err != nil {
return fmt.Errorf("pattern matching for dynamic field failed: %w", err)
}
if !matched {
multiErr = append(multiErr, fmt.Errorf("dynamic field \"%s\" doesn't match the pattern (%s): %s",
key, pattern, valStr))
}
}
}
if len(multiErr) > 0 {
return testrunner.ErrTestCaseFailed{
Reason: "one or more problems with dynamic fields found in documents",
Details: multiErr.Unique().Error(),
}
}
return nil
}
func verifyFieldsInTestResult(result *testResult, fieldsValidator *fields.Validator) error {
var multiErr multierror.Error
for _, event := range result.events {
err := checkErrorMessage(event)
if err != nil {
multiErr = append(multiErr, err)
continue // all fields can be wrong, no need validate them
}
errs := fieldsValidator.ValidateDocumentBody(event)
if errs != nil {
multiErr = append(multiErr, errs...)
}
}
if len(multiErr) > 0 {
return testrunner.ErrTestCaseFailed{
Reason: "one or more problems with fields found in documents",
Details: multiErr.Unique().Error(),
}
}
return nil
}
func checkErrorMessage(event json.RawMessage) error {
var pipelineError struct {
Error struct {
Message interface{}
}
}
err := formatter.JSONUnmarshalUsingNumber(event, &pipelineError)
if err != nil {
return fmt.Errorf("can't unmarshal event to check pipeline error: %#q: %w", event, err)
}
switch m := pipelineError.Error.Message.(type) {
case nil:
return nil
case string, []string:
return fmt.Errorf("unexpected pipeline error: %s", m)
case []interface{}:
for i, v := range m {
switch v.(type) {
case string:
break
default:
return fmt.Errorf("unexpected pipeline error (unexpected error.message type %T at position %d): %v", v, i, m)
}
}
return fmt.Errorf("unexpected pipeline error: %s", m)
default:
return fmt.Errorf("unexpected pipeline error (unexpected error.message type %T): %[1]v", m)
}
}