internal/benchrunner/runners/pipeline/runner.go (180 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package pipeline
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"github.com/elastic/elastic-package/internal/benchrunner"
"github.com/elastic/elastic-package/internal/benchrunner/reporters"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/testrunner"
)
const (
// BenchType defining pipeline benchmarks.
BenchType benchrunner.Type = "pipeline"
expectedTestResultSuffix = "-expected.json"
configTestSuffixYAML = "-config.yml"
)
type runner struct {
options Options
entryPipeline string
pipelines []ingest.Pipeline
}
func NewPipelineBenchmark(opts Options) benchrunner.Runner {
return &runner{options: opts}
}
func (r *runner) SetUp(ctx context.Context) error {
dataStreamPath, found, err := packages.FindDataStreamRootForPath(r.options.Folder.Path)
if err != nil {
return fmt.Errorf("locating data_stream root failed: %w", err)
}
if !found {
return errors.New("data stream root not found")
}
r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.options.API, dataStreamPath)
if err != nil {
return fmt.Errorf("installing ingest pipelines failed: %w", err)
}
return nil
}
// TearDown shuts down the pipeline benchmark runner.
func (r *runner) TearDown(ctx context.Context) error {
if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil {
return fmt.Errorf("uninstalling ingest pipelines failed: %w", err)
}
return nil
}
// Run runs the pipeline benchmarks defined under the given folder
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
return r.run(ctx)
}
func (r *runner) run(ctx context.Context) (reporters.Reportable, error) {
b, err := r.loadBenchmark()
if err != nil {
return nil, fmt.Errorf("loading benchmark failed: %w", err)
}
benchmark, err := r.benchmarkPipeline(ctx, b, r.entryPipeline)
if err != nil {
return nil, err
}
formattedReport, err := formatResult(r.options.Format, benchmark)
if err != nil {
return nil, err
}
switch r.options.Format {
case ReportFormatHuman:
return reporters.NewReport(r.options.Folder.Package, formattedReport), nil
}
return reporters.NewFileReport(
r.options.BenchName,
filenameByFormat(r.options.BenchName, r.options.Format),
formattedReport,
), nil
}
// FindBenchmarkFolders finds benchmark folders for the given package and, optionally, benchmark type and data streams
func FindBenchmarkFolders(packageRootPath string, dataStreams []string) ([]testrunner.TestFolder, error) {
// Expected folder structure:
// <packageRootPath>/
// data_stream/
// <dataStream>/
// _dev/
// benchmark/
// <benchType>/
var paths []string
if len(dataStreams) > 0 {
sort.Strings(dataStreams)
for _, dataStream := range dataStreams {
p, err := findBenchFolderPaths(packageRootPath, dataStream)
if err != nil {
return nil, err
}
paths = append(paths, p...)
}
} else {
p, err := findBenchFolderPaths(packageRootPath, "*")
if err != nil {
return nil, err
}
paths = p
}
sort.Strings(dataStreams)
for _, dataStream := range dataStreams {
p, err := findBenchFolderPaths(packageRootPath, dataStream)
if err != nil {
return nil, err
}
paths = append(paths, p...)
}
folders := make([]testrunner.TestFolder, len(paths))
_, pkg := filepath.Split(packageRootPath)
for idx, p := range paths {
relP := strings.TrimPrefix(p, packageRootPath)
parts := strings.Split(relP, string(filepath.Separator))
dataStream := parts[2]
folder := testrunner.TestFolder{
Path: p,
Package: pkg,
DataStream: dataStream,
}
folders[idx] = folder
}
return folders, nil
}
func (r *runner) listBenchmarkFiles() ([]string, error) {
fis, err := os.ReadDir(r.options.Folder.Path)
if err != nil {
return nil, fmt.Errorf("reading pipeline benchmarks failed (path: %s): %w", r.options.Folder.Path, err)
}
var files []string
for _, fi := range fis {
if fi.Name() == configYAML ||
// since pipeline tests might be included we need to
// exclude the expected and config files for them
strings.HasSuffix(fi.Name(), expectedTestResultSuffix) ||
strings.HasSuffix(fi.Name(), configTestSuffixYAML) {
continue
}
files = append(files, fi.Name())
}
return files, nil
}
func (r *runner) loadBenchmark() (*benchmark, error) {
benchFiles, err := r.listBenchmarkFiles()
if err != nil {
return nil, err
}
var allEntries []json.RawMessage
for _, benchFile := range benchFiles {
benchPath := filepath.Join(r.options.Folder.Path, benchFile)
benchData, err := os.ReadFile(benchPath)
if err != nil {
return nil, fmt.Errorf("reading input file failed (benchPath: %s): %w", benchPath, err)
}
ext := filepath.Ext(benchFile)
var entries []json.RawMessage
switch ext {
case ".json":
entries, err = readBenchmarkEntriesForEvents(benchData)
if err != nil {
return nil, fmt.Errorf("reading benchmark case entries for events failed (benchmarkPath: %s): %w", benchPath, err)
}
case ".log":
entries, err = readBenchmarkEntriesForRawInput(benchData)
if err != nil {
return nil, fmt.Errorf("creating benchmark case entries for raw input failed (benchmarkPath: %s): %w", benchPath, err)
}
default:
return nil, fmt.Errorf("unsupported extension for benchmark case file (ext: %s)", ext)
}
allEntries = append(allEntries, entries...)
}
config, err := readConfig(r.options.Folder.Path)
if err != nil {
return nil, fmt.Errorf("reading config for benchmark failed (benchPath: %s): %w", r.options.Folder.Path, err)
}
tc, err := createBenchmark(allEntries, config)
if err != nil {
return nil, fmt.Errorf("can't create benchmark case (benchmarkPath: %s): %w", r.options.Folder.Path, err)
}
return tc, nil
}
// findBenchFoldersPaths can only be called for benchmark runners that require benchmarks to be defined
// at the data stream level.
func findBenchFolderPaths(packageRootPath, dataStreamGlob string) ([]string, error) {
benchFoldersGlob := filepath.Join(packageRootPath, "data_stream", dataStreamGlob, "_dev", "benchmark", "pipeline")
paths, err := filepath.Glob(benchFoldersGlob)
if err != nil {
return nil, fmt.Errorf("error finding benchmark folders: %w", err)
}
return paths, err
}