in internal/testrunner/runners/system/tester.go [1541:1685]
func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.ResultComposer, scenario *scenarioTest, config *testConfig) ([]testrunner.TestResult, error) {
// Validate fields in docs
// when reroute processors are used, expectedDatasets should be set depends on the processor config
var expectedDatasets []string
for _, pipeline := range r.pipelines {
var esIngestPipeline map[string]any
err := yaml.Unmarshal(pipeline.Content, &esIngestPipeline)
if err != nil {
return nil, fmt.Errorf("unmarshalling ingest pipeline content failed: %w", err)
}
processors, _ := esIngestPipeline["processors"].([]any)
for _, p := range processors {
processor, ok := p.(map[string]any)
if !ok {
return nil, fmt.Errorf("unexpected processor %+v", p)
}
if reroute, ok := processor["reroute"]; ok {
if rerouteP, ok := reroute.(ingest.RerouteProcessor); ok {
expectedDatasets = append(expectedDatasets, rerouteP.Dataset...)
}
}
}
}
if expectedDatasets == nil {
var expectedDataset string
if ds := r.testFolder.DataStream; ds != "" {
expectedDataset = getDataStreamDataset(*r.pkgManifest, *r.dataStreamManifest)
} else {
expectedDataset = r.pkgManifest.Name + "." + scenario.policyTemplateName
}
expectedDatasets = []string{expectedDataset}
}
if r.pkgManifest.Type == "input" {
v, _ := config.Vars.GetValue("data_stream.dataset")
if dataset, ok := v.(string); ok && dataset != "" {
expectedDatasets = append(expectedDatasets, dataset)
}
}
fieldsValidator, err := fields.CreateValidatorForDirectory(r.dataStreamPath,
fields.WithSpecVersion(r.pkgManifest.SpecVersion),
fields.WithNumericKeywordFields(config.NumericKeywordFields),
fields.WithStringNumberFields(config.StringNumberFields),
fields.WithExpectedDatasets(expectedDatasets),
fields.WithEnabledImportAllECSSChema(true),
fields.WithDisableNormalization(scenario.syntheticEnabled),
)
if err != nil {
return result.WithErrorf("creating fields validator for data stream failed (path: %s): %w", r.dataStreamPath, err)
}
if errs := validateFields(scenario.docs, fieldsValidator); len(errs) > 0 {
return result.WithError(testrunner.ErrTestCaseFailed{
Reason: fmt.Sprintf("one or more errors found in documents stored in %s data stream", scenario.dataStream),
Details: errs.Error(),
})
}
if r.fieldValidationMethod == mappingsMethod {
logger.Debug("Performing validation based on mappings")
exceptionFields := listExceptionFields(scenario.docs, fieldsValidator)
mappingsValidator, err := fields.CreateValidatorForMappings(r.esClient,
fields.WithMappingValidatorFallbackSchema(fieldsValidator.Schema),
fields.WithMappingValidatorIndexTemplate(scenario.indexTemplateName),
fields.WithMappingValidatorDataStream(scenario.dataStream),
fields.WithMappingValidatorExceptionFields(exceptionFields),
)
if err != nil {
return result.WithErrorf("creating mappings validator for data stream failed (data stream: %s): %w", scenario.dataStream, err)
}
if errs := validateMappings(ctx, mappingsValidator); len(errs) > 0 {
return result.WithError(testrunner.ErrTestCaseFailed{
Reason: fmt.Sprintf("one or more errors found in mappings in %s index template", scenario.indexTemplateName),
Details: errs.Error(),
})
}
}
stackVersion, err := semver.NewVersion(r.stackVersion.Number)
if err != nil {
return result.WithErrorf("failed to parse stack version: %w", err)
}
err = validateIgnoredFields(stackVersion, scenario, config)
if err != nil {
return result.WithError(err)
}
docs := scenario.docs
if scenario.syntheticEnabled {
docs, err = fieldsValidator.SanitizeSyntheticSourceDocs(scenario.docs)
if err != nil {
results, _ := result.WithErrorf("failed to sanitize synthetic source docs: %w", err)
return results, nil
}
}
specVersion, err := semver.NewVersion(r.pkgManifest.SpecVersion)
if err != nil {
return result.WithErrorf("failed to parse format version %q: %w", r.pkgManifest.SpecVersion, err)
}
// Write sample events file from first doc, if requested
if err := r.generateTestResultFile(docs, *specVersion); err != nil {
return result.WithError(err)
}
// Check Hit Count within docs, if 0 then it has not been specified
if assertionPass, message := assertHitCount(config.Assert.HitCount, docs); !assertionPass {
result.FailureMsg = message
}
// Check transforms if present
if err := r.checkTransforms(ctx, config, r.pkgManifest, scenario.kibanaDataStream, scenario.dataStream, scenario.syntheticEnabled); err != nil {
results, _ := result.WithError(err)
return results, nil
}
if scenario.agent != nil {
logResults, err := r.checkNewAgentLogs(ctx, scenario.agent, scenario.startTestTime, errorPatterns, config.Name())
if err != nil {
return result.WithError(err)
}
if len(logResults) > 0 {
return logResults, nil
}
}
if results := r.checkDeprecationWarnings(stackVersion, scenario.dataStream, scenario.deprecationWarnings, config.Name()); len(results) > 0 {
return results, nil
}
if r.withCoverage {
coverage, err := r.generateCoverageReport(result.CoveragePackageName())
if err != nil {
return result.WithErrorf("coverage report generation failed: %w", err)
}
result = result.WithCoverage(coverage)
}
return result.WithSuccess()
}