func()

in internal/testrunner/runners/system/tester.go [1541:1685]


func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.ResultComposer, scenario *scenarioTest, config *testConfig) ([]testrunner.TestResult, error) {
	// Validate fields in docs
	// when reroute processors are used, expectedDatasets should be set depends on the processor config
	var expectedDatasets []string
	for _, pipeline := range r.pipelines {
		var esIngestPipeline map[string]any
		err := yaml.Unmarshal(pipeline.Content, &esIngestPipeline)
		if err != nil {
			return nil, fmt.Errorf("unmarshalling ingest pipeline content failed: %w", err)
		}
		processors, _ := esIngestPipeline["processors"].([]any)
		for _, p := range processors {
			processor, ok := p.(map[string]any)
			if !ok {
				return nil, fmt.Errorf("unexpected processor %+v", p)
			}
			if reroute, ok := processor["reroute"]; ok {
				if rerouteP, ok := reroute.(ingest.RerouteProcessor); ok {
					expectedDatasets = append(expectedDatasets, rerouteP.Dataset...)
				}
			}
		}
	}

	if expectedDatasets == nil {
		var expectedDataset string
		if ds := r.testFolder.DataStream; ds != "" {
			expectedDataset = getDataStreamDataset(*r.pkgManifest, *r.dataStreamManifest)
		} else {
			expectedDataset = r.pkgManifest.Name + "." + scenario.policyTemplateName
		}
		expectedDatasets = []string{expectedDataset}
	}
	if r.pkgManifest.Type == "input" {
		v, _ := config.Vars.GetValue("data_stream.dataset")
		if dataset, ok := v.(string); ok && dataset != "" {
			expectedDatasets = append(expectedDatasets, dataset)
		}
	}

	fieldsValidator, err := fields.CreateValidatorForDirectory(r.dataStreamPath,
		fields.WithSpecVersion(r.pkgManifest.SpecVersion),
		fields.WithNumericKeywordFields(config.NumericKeywordFields),
		fields.WithStringNumberFields(config.StringNumberFields),
		fields.WithExpectedDatasets(expectedDatasets),
		fields.WithEnabledImportAllECSSChema(true),
		fields.WithDisableNormalization(scenario.syntheticEnabled),
	)
	if err != nil {
		return result.WithErrorf("creating fields validator for data stream failed (path: %s): %w", r.dataStreamPath, err)
	}

	if errs := validateFields(scenario.docs, fieldsValidator); len(errs) > 0 {
		return result.WithError(testrunner.ErrTestCaseFailed{
			Reason:  fmt.Sprintf("one or more errors found in documents stored in %s data stream", scenario.dataStream),
			Details: errs.Error(),
		})
	}

	if r.fieldValidationMethod == mappingsMethod {
		logger.Debug("Performing validation based on mappings")
		exceptionFields := listExceptionFields(scenario.docs, fieldsValidator)

		mappingsValidator, err := fields.CreateValidatorForMappings(r.esClient,
			fields.WithMappingValidatorFallbackSchema(fieldsValidator.Schema),
			fields.WithMappingValidatorIndexTemplate(scenario.indexTemplateName),
			fields.WithMappingValidatorDataStream(scenario.dataStream),
			fields.WithMappingValidatorExceptionFields(exceptionFields),
		)
		if err != nil {
			return result.WithErrorf("creating mappings validator for data stream failed (data stream: %s): %w", scenario.dataStream, err)
		}

		if errs := validateMappings(ctx, mappingsValidator); len(errs) > 0 {
			return result.WithError(testrunner.ErrTestCaseFailed{
				Reason:  fmt.Sprintf("one or more errors found in mappings in %s index template", scenario.indexTemplateName),
				Details: errs.Error(),
			})
		}
	}

	stackVersion, err := semver.NewVersion(r.stackVersion.Number)
	if err != nil {
		return result.WithErrorf("failed to parse stack version: %w", err)
	}

	err = validateIgnoredFields(stackVersion, scenario, config)
	if err != nil {
		return result.WithError(err)
	}

	docs := scenario.docs
	if scenario.syntheticEnabled {
		docs, err = fieldsValidator.SanitizeSyntheticSourceDocs(scenario.docs)
		if err != nil {
			results, _ := result.WithErrorf("failed to sanitize synthetic source docs: %w", err)
			return results, nil
		}
	}

	specVersion, err := semver.NewVersion(r.pkgManifest.SpecVersion)
	if err != nil {
		return result.WithErrorf("failed to parse format version %q: %w", r.pkgManifest.SpecVersion, err)
	}

	// Write sample events file from first doc, if requested
	if err := r.generateTestResultFile(docs, *specVersion); err != nil {
		return result.WithError(err)
	}

	// Check Hit Count within docs, if 0 then it has not been specified
	if assertionPass, message := assertHitCount(config.Assert.HitCount, docs); !assertionPass {
		result.FailureMsg = message
	}

	// Check transforms if present
	if err := r.checkTransforms(ctx, config, r.pkgManifest, scenario.kibanaDataStream, scenario.dataStream, scenario.syntheticEnabled); err != nil {
		results, _ := result.WithError(err)
		return results, nil
	}

	if scenario.agent != nil {
		logResults, err := r.checkNewAgentLogs(ctx, scenario.agent, scenario.startTestTime, errorPatterns, config.Name())
		if err != nil {
			return result.WithError(err)
		}
		if len(logResults) > 0 {
			return logResults, nil
		}
	}

	if results := r.checkDeprecationWarnings(stackVersion, scenario.dataStream, scenario.deprecationWarnings, config.Name()); len(results) > 0 {
		return results, nil
	}

	if r.withCoverage {
		coverage, err := r.generateCoverageReport(result.CoveragePackageName())
		if err != nil {
			return result.WithErrorf("coverage report generation failed: %w", err)
		}
		result = result.WithCoverage(coverage)
	}

	return result.WithSuccess()
}