in internal/testrunner/runners/pipeline/coverage.go [21:104]
func getPipelineCoverage(pkgName string, options PipelineTesterOptions, pipelines []ingest.Pipeline) (testrunner.CoverageReport, error) {
dataStreamPath, found, err := packages.FindDataStreamRootForPath(options.TestFolder.Path)
if err != nil {
return nil, fmt.Errorf("locating data_stream root failed: %w", err)
}
if !found {
return nil, errors.New("data stream root not found")
}
// Use the Node Stats API to get stats for all installed pipelines.
// These stats contain hit counts for all main processors in a pipeline.
stats, err := ingest.GetPipelineStats(options.API, pipelines)
if err != nil {
return nil, fmt.Errorf("error fetching pipeline stats for code coverage calculations: %w", err)
}
// Use the package's parent directory as base path, so that the relative paths
// for each class (pipeline) include the package name. This prevents paths for
// different packages colliding (i.e. a lot of packages have a "log" datastream
// and a default.yml pipeline).
basePath := filepath.Dir(options.PackageRootPath)
repositoryRootDir, err := files.FindRepositoryRootDirectory()
if err != nil {
return nil, err
}
if options.CoverageType == "cobertura" {
pkg := &testrunner.CoberturaPackage{
Name: pkgName,
}
cobertura := &testrunner.CoberturaCoverage{
Sources: []*testrunner.CoberturaSource{
{
Path: basePath,
},
},
Packages: []*testrunner.CoberturaPackage{pkg},
Timestamp: time.Now().UnixNano(),
}
// Calculate coverage for each pipeline
for _, pipeline := range pipelines {
pipelineName, pipelineRelPath, src, pstats, err := pipelineDataForCoverage(pipeline, stats, repositoryRootDir, dataStreamPath)
if err != nil {
return nil, err
}
covered, class, err := coberturaForSinglePipeline(pipelineName, pipelineRelPath, src, pstats)
if err != nil {
return nil, fmt.Errorf("error calculating coverage for pipeline '%s': %w", pipeline.Filename(), err)
}
pkg.Classes = append(pkg.Classes, class)
cobertura.LinesValid += int64(len(class.Methods))
cobertura.LinesCovered += covered
}
return cobertura, nil
}
if options.CoverageType == "generic" {
coverage := &testrunner.GenericCoverage{
Version: 1,
Timestamp: time.Now().UnixNano(),
TestType: "Cobertura for pipeline test",
}
// Calculate coverage for each pipeline
for _, pipeline := range pipelines {
_, pipelineRelPath, src, pstats, err := pipelineDataForCoverage(pipeline, stats, repositoryRootDir, dataStreamPath)
if err != nil {
return nil, err
}
_, file, err := genericCoverageForSinglePipeline(pipelineRelPath, src, pstats)
if err != nil {
return nil, fmt.Errorf("error calculating coverage for pipeline '%s': %w", pipeline.Filename(), err)
}
coverage.Files = append(coverage.Files, file)
}
return coverage, nil
}
return nil, fmt.Errorf("unrecognised coverage type")
}