internal/testrunner/testrunner.go (360 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package testrunner
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/environment"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/profile"
)
var (
defaultMaximumRoutines = runtime.GOMAXPROCS(0) / 2
maximumNumberParallelTest = environment.WithElasticPackagePrefix("MAXIMUM_NUMBER_PARALLEL_TESTS")
)
// TestType represents the various supported test types
type TestType string
// TestOptions contains test runner options.
type TestOptions struct {
Profile *profile.Profile
TestFolder TestFolder
PackageRootPath string
GenerateTestResult bool
API *elasticsearch.API
KibanaClient *kibana.Client
DeferCleanup time.Duration
ServiceVariant string
WithCoverage bool
CoverageType string
ConfigFilePath string
RunSetup bool
RunTearDown bool
RunTestsOnly bool
}
// Tester is the interface all test runners must implement.
type Tester interface {
// Type returns the test runner's type.
Type() TestType
// String returns the human-friendly name of the test runner.
String() string
// Run executes the test runner.
Run(context.Context) ([]TestResult, error)
// TearDown cleans up any test runner resources. It must be called
// after the test runner has finished executing.
TearDown(context.Context) error
// Parallel indicates if this test can be run in parallel or not
Parallel() bool
}
type TesterFactory func(TestFolder) (Tester, error)
// TestRunner is the interface test runners that require a global initialization must implement.
type TestRunner interface {
// Type returns the test runner's type.
Type() TestType
// SetupRunner prepares global resources required by the test runner.
SetupRunner(context.Context) error
GetTests(context.Context) ([]Tester, error)
// TearDownRunner cleans up any global test runner resources. It must be called
// after the test runner has finished executing all its tests.
TearDownRunner(context.Context) error
}
// TestResult contains a single test's results
type TestResult struct {
// Name of test result. Optional.
Name string
// Package to which this test result belongs.
Package string
// TestType indicates the type of test.
TestType TestType
// Data stream to which this test result belongs.
DataStream string
// Time elapsed from running a test case to arriving at its result.
TimeElapsed time.Duration
// If test case failed, short description of the failure. A failure is
// when the test completes execution but the actual results of the test
// don't match the expected results.
FailureMsg string
// If test case failed, longer description of the failure.
FailureDetails string
// If there was an error while running the test case, description
// of the error. An error is when the test cannot complete execution due
// to an unexpected runtime error in the test execution.
ErrorMsg string
// If the test was skipped, the reason it was skipped and a link for more
// details.
Skipped *SkipConfig
// Coverage details in Cobertura format (optional).
Coverage CoverageReport
}
// ResultComposer wraps a TestResult and provides convenience methods for
// manipulating this TestResult.
type ResultComposer struct {
TestResult
StartTime time.Time
}
// NewResultComposer returns a new ResultComposer with the StartTime
// initialized to now.
func NewResultComposer(tr TestResult) *ResultComposer {
return &ResultComposer{
TestResult: tr,
StartTime: time.Now(),
}
}
// WithCoverage appends the coverage report to the result composer. Results built with the composer
// will include this coverage report.
func (rc *ResultComposer) WithCoverage(coverage CoverageReport) *ResultComposer {
rc.TestResult.Coverage = coverage
return rc
}
// CoveragePackageName returns a package name that can be used in coverage reports, based on information
// in the composer.
func (rc *ResultComposer) CoveragePackageName() string {
if rc.DataStream != "" {
return rc.Package + "." + rc.DataStream
}
return rc.Package
}
// WithError sets an error on the test result wrapped by ResultComposer.
func (rc *ResultComposer) WithError(err error) ([]TestResult, error) {
rc.TimeElapsed = time.Since(rc.StartTime)
if err == nil {
return []TestResult{rc.TestResult}, nil
}
var tcf ErrTestCaseFailed
if errors.As(err, &tcf) {
rc.FailureMsg += tcf.Error()
rc.FailureDetails += tcf.Details
return []TestResult{rc.TestResult}, nil
}
rc.ErrorMsg += err.Error()
return []TestResult{rc.TestResult}, err
}
// WithErrorf sets an error on the test result wrapped by ResultComposer.
func (rc *ResultComposer) WithErrorf(format string, a ...any) ([]TestResult, error) {
return rc.WithError(fmt.Errorf(format, a...))
}
// WithSuccess marks the test result wrapped by ResultComposer as successful.
func (rc *ResultComposer) WithSuccess() ([]TestResult, error) {
return rc.WithError(nil)
}
// WithSkip marks the test result wrapped by ResultComposer as skipped.
func (rc *ResultComposer) WithSkip(s *SkipConfig) ([]TestResult, error) {
rc.TestResult.Skipped = s
return rc.WithError(nil)
}
// TestFolder encapsulates the test folder path and names of the package + data stream
// to which the test folder belongs.
type TestFolder struct {
Path string
Package string
DataStream string
}
// AssumeTestFolders assumes potential test folders for the given package, data streams and test types.
func AssumeTestFolders(packageRootPath string, dataStreams []string, testType TestType) ([]TestFolder, error) {
// Expected folder structure:
// <packageRootPath>/
// data_stream/
// <dataStream>/
dataStreamsPath := filepath.Join(packageRootPath, "data_stream")
if len(dataStreams) == 0 {
fileInfos, err := os.ReadDir(dataStreamsPath)
if errors.Is(err, os.ErrNotExist) {
return []TestFolder{}, nil // data streams defined
}
if err != nil {
return nil, fmt.Errorf("can't read directory (path: %s): %w", dataStreamsPath, err)
}
for _, fi := range fileInfos {
if !fi.IsDir() {
continue
}
dataStreams = append(dataStreams, fi.Name())
}
}
var folders []TestFolder
for _, dataStream := range dataStreams {
folders = append(folders, TestFolder{
Path: filepath.Join(dataStreamsPath, dataStream, "_dev", "test", string(testType)),
Package: filepath.Base(packageRootPath),
DataStream: dataStream,
})
}
return folders, nil
}
// FindTestFolders finds test folders for the given package and, optionally, test type and data streams
func FindTestFolders(packageRootPath string, dataStreams []string, testType TestType) ([]TestFolder, error) {
// Expected folder structure for packages with data streams (integration packages):
// <packageRootPath>/
// data_stream/
// <dataStream>/
// _dev/
// test/
// <testType>/
//
// Expected folder structure for packages without data streams (input packages):
// <packageRootPath>/
// _dev/
// test/
// <testType>/
testTypeGlob := "*"
if testType != "" {
testTypeGlob = string(testType)
}
var paths []string
if len(dataStreams) > 0 {
sort.Strings(dataStreams)
for _, dataStream := range dataStreams {
p, err := findDataStreamTestFolderPaths(packageRootPath, dataStream, testTypeGlob)
if err != nil {
return nil, err
}
paths = append(paths, p...)
}
} else {
// No datastreams specified, try to discover them.
p, err := findDataStreamTestFolderPaths(packageRootPath, "*", testTypeGlob)
if err != nil {
return nil, err
}
// Look for tests at the package level, like for input packages.
if len(p) == 0 {
p, err = findPackageTestFolderPaths(packageRootPath, testTypeGlob)
if err != nil {
return nil, err
}
}
paths = p
}
folders := make([]TestFolder, len(paths))
_, pkg := filepath.Split(packageRootPath)
for idx, p := range paths {
dataStream := ExtractDataStreamFromPath(p, packageRootPath)
folder := TestFolder{
Path: p,
Package: pkg,
DataStream: dataStream,
}
folders[idx] = folder
}
return folders, nil
}
func ExtractDataStreamFromPath(fullPath, packageRootPath string) string {
relP := strings.TrimPrefix(fullPath, packageRootPath)
parts := strings.Split(relP, string(filepath.Separator))
dataStream := ""
if len(parts) >= 3 && parts[1] == "data_stream" {
dataStream = parts[2]
}
return dataStream
}
func RunSuite(ctx context.Context, runner TestRunner) ([]TestResult, error) {
testers, err := runner.GetTests(ctx)
if err != nil {
return nil, fmt.Errorf("failed to retrieve tests: %w", err)
}
if len(testers) == 0 {
return nil, nil
}
err = runner.SetupRunner(ctx)
if err != nil {
cleanupCtx := context.WithoutCancel(ctx)
tdErr := runner.TearDownRunner(cleanupCtx)
if tdErr != nil {
logger.Debugf("failed to tear down %s runner: %s", runner.Type(), tdErr)
}
return nil, fmt.Errorf("failed to setup %s runner: %w", runner.Type(), err)
}
var parallelTesters, sequentialTesters []Tester
for _, tester := range testers {
if tester.Parallel() {
parallelTesters = append(parallelTesters, tester)
} else {
sequentialTesters = append(sequentialTesters, tester)
}
}
var allResults, results []TestResult
var parallelErr, sequentialErr error
results, parallelErr = runSuiteParallel(ctx, parallelTesters)
allResults = append(allResults, results...)
results, sequentialErr = runSuite(ctx, sequentialTesters)
allResults = append(allResults, results...)
// Avoid cancellations during cleanup.
cleanupCtx := context.WithoutCancel(ctx)
tdErr := runner.TearDownRunner(cleanupCtx)
if tdErr != nil {
return allResults, fmt.Errorf("failed to tear down %s runner: %w", runner.Type(), tdErr)
}
if parallelErr != nil {
return allResults, parallelErr
}
if sequentialErr != nil {
return allResults, sequentialErr
}
return allResults, nil
}
func maxNumberRoutines() (int, error) {
var err error
maxRoutines := defaultMaximumRoutines
v, ok := os.LookupEnv(maximumNumberParallelTest)
if ok {
maxRoutines, err = strconv.Atoi(v)
if err != nil {
return 0, fmt.Errorf("failed to read number of maximum routines from environment variable: %w", err)
}
}
return maxRoutines, nil
}
func runSuite(ctx context.Context, testers []Tester) ([]TestResult, error) {
if len(testers) == 0 {
return nil, nil
}
logger.Debugf("Running tests sequentially")
var results []TestResult
for _, tester := range testers {
r, err := run(ctx, tester)
if err != nil {
return results, fmt.Errorf("error running package %s tests: %w", tester.Type(), err)
}
results = append(results, r...)
}
return results, nil
}
// runSuiteParallel method delegates execution of tests to the runners generated through the factory function.
func runSuiteParallel(ctx context.Context, testers []Tester) ([]TestResult, error) {
if len(testers) == 0 {
return nil, nil
}
maxRoutines, err := maxNumberRoutines()
if err != nil {
return nil, err
}
var wg sync.WaitGroup
type routineResult struct {
results []TestResult
err error
}
chResults := make(chan routineResult, len(testers))
logger.Debugf("Running tests in parallel. Maximum routines to run in parallel: %d", maxRoutines)
// Use channel as a semaphore to limit the number of test executions in parallel
sem := make(chan int, maxRoutines)
for _, tester := range testers {
wg.Add(1)
tester := tester
sem <- 1
go func() {
defer wg.Done()
defer func() {
<-sem
}()
if err := ctx.Err(); err != nil {
logger.Errorf("context error: %s", context.Cause(ctx))
chResults <- routineResult{nil, err}
return
}
r, err := run(ctx, tester)
chResults <- routineResult{r, err}
}()
}
wg.Wait()
close(chResults)
close(sem)
var results []TestResult
var multiErr error
testType := testers[0].Type()
for testResults := range chResults {
if testResults.err != nil {
multiErr = errors.Join(multiErr, testResults.err)
}
results = append(results, testResults.results...)
}
if multiErr != nil {
return results, fmt.Errorf("error running package %s tests: %w", testType, multiErr)
}
return results, nil
}
// run method delegates execution of tests to the given test runner.
func run(ctx context.Context, tester Tester) ([]TestResult, error) {
results, err := tester.Run(ctx)
tdErr := tester.TearDown(ctx)
if err != nil {
return nil, fmt.Errorf("could not complete test run: %w", err)
}
if tdErr != nil {
return results, fmt.Errorf("could not teardown test runner: %w", tdErr)
}
return results, nil
}
// findDataStreamTestFoldersPaths can only be called for test runners that require tests to be defined
// at the data stream level.
func findDataStreamTestFolderPaths(packageRootPath, dataStreamGlob, testTypeGlob string) ([]string, error) {
testFoldersGlob := filepath.Join(packageRootPath, "data_stream", dataStreamGlob, "_dev", "test", testTypeGlob)
paths, err := filepath.Glob(testFoldersGlob)
if err != nil {
return nil, fmt.Errorf("error finding test folders: %w", err)
}
return paths, err
}
// findPackageTestFolderPaths finds tests at the package level.
func findPackageTestFolderPaths(packageRootPath, testTypeGlob string) ([]string, error) {
testFoldersGlob := filepath.Join(packageRootPath, "_dev", "test", testTypeGlob)
paths, err := filepath.Glob(testFoldersGlob)
if err != nil {
return nil, fmt.Errorf("error finding test folders: %w", err)
}
return paths, err
}
func PackageHasDataStreams(manifest *packages.PackageManifest) (bool, error) {
switch manifest.Type {
case "integration":
return true, nil
case "input", "content":
return false, nil
default:
return false, fmt.Errorf("unexpected package type %q", manifest.Type)
}
}
func AnySkipConfig(configs ...*SkipConfig) *SkipConfig {
for _, config := range configs {
if config != nil {
return config
}
}
return nil
}