internal/benchrunner/runners/stream/runner.go (533 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package stream
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/config"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/fields"
"github.com/elastic/elastic-package/internal/benchrunner"
"github.com/elastic/elastic-package/internal/benchrunner/reporters"
"github.com/elastic/elastic-package/internal/benchrunner/runners/common"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/multierror"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/packages/installer"
"github.com/elastic/elastic-package/internal/servicedeployer"
"github.com/elastic/elastic-package/internal/wait"
)
const numberOfEvents = 100
type runner struct {
options Options
scenarios map[string]*scenario
svcInfo servicedeployer.ServiceInfo
runtimeDataStreams map[string]string
generators map[string]genlib.Generator
backFillGenerators map[string]genlib.Generator
// Execution order of following handlers is defined in runner.TearDown() method.
removePackageHandler func(context.Context) error
wipeDataStreamHandler func(context.Context) error
}
func NewStreamBenchmark(opts Options) benchrunner.Runner {
return &runner{options: opts}
}
func (r *runner) SetUp(ctx context.Context) error {
return r.setUp(ctx)
}
func StaticValidation(ctx context.Context, opts Options, dataStreamName string) (bool, error) {
runner := runner{options: opts}
err := runner.initialize()
if err != nil {
return false, err
}
hasBenchmark, err := runner.validateScenario(ctx, dataStreamName)
return hasBenchmark, err
}
// Run runs the system benchmarks defined under the given folder
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
return nil, r.run(ctx)
}
func (r *runner) TearDown(ctx context.Context) error {
if !r.options.PerformCleanup {
r.removePackageHandler = nil
r.wipeDataStreamHandler = nil
return nil
}
// Avoid cancellations during cleanup.
cleanupCtx := context.WithoutCancel(ctx)
var merr multierror.Error
if r.removePackageHandler != nil {
if err := r.removePackageHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.removePackageHandler = nil
}
if r.wipeDataStreamHandler != nil {
if err := r.wipeDataStreamHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.wipeDataStreamHandler = nil
}
if len(merr) == 0 {
return nil
}
return merr
}
func (r *runner) initialize() error {
r.generators = make(map[string]genlib.Generator)
r.backFillGenerators = make(map[string]genlib.Generator)
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
scenarios, err := readScenarios(r.options.PackageRootPath, r.options.BenchName, pkgManifest.Name, pkgManifest.Version)
if err != nil {
return err
}
r.scenarios = scenarios
return nil
}
func (r *runner) validateScenario(ctx context.Context, dataStreamName string) (bool, error) {
for scenarioName, scenario := range r.scenarios {
if scenario.DataStream.Name != dataStreamName {
continue
}
generator, _, err := r.createGenerator(ctx, scenarioName, scenario)
if err != nil {
return true, err
}
for i := 0; i < numberOfEvents; i++ {
buf := bytes.NewBufferString("")
err := generator.Emit(buf)
if err != nil {
return true, fmt.Errorf("[%s] error while generating event: %w", scenarioName, err)
}
// check whether the generated event is valid json
var event map[string]any
err = json.Unmarshal(buf.Bytes(), &event)
if err != nil {
return true, fmt.Errorf("[%s] failed to unmarshal json event: %w, generated output: %s", scenarioName, err, buf.String())
}
}
return true, nil
}
return false, nil
}
func (r *runner) setUp(ctx context.Context) error {
err := r.initialize()
if err != nil {
return err
}
err = r.collectGenerators(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}
r.runtimeDataStreams = make(map[string]string)
r.svcInfo.Test.RunID = common.NewRunID()
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}
for scenarioName, scenario := range r.scenarios {
var err error
dataStreamManifest, err := packages.ReadDataStreamManifest(
filepath.Join(
common.DataStreamPath(r.options.PackageRootPath, scenario.DataStream.Name),
packages.DataStreamManifestFile,
),
)
if err != nil {
return fmt.Errorf("reading data stream manifest failed: %w", err)
}
r.runtimeDataStreams[scenarioName] = fmt.Sprintf(
"%s-%s.%s-ep",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
)
}
if !r.options.PerformCleanup {
return nil
}
if err := r.wipeDataStreamsOnSetup(ctx); err != nil {
return fmt.Errorf("error cleaning up old data in data streams: %w", err)
}
cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
totalHits := 0
for _, runtimeDataStream := range r.runtimeDataStreams {
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, runtimeDataStream)
if err != nil {
return false, err
}
totalHits += hits
}
return totalHits == 0, nil
}, 5*time.Second, 2*time.Minute)
if err != nil || !cleared {
if err == nil {
err = errors.New("unable to clear previous data")
}
return err
}
return nil
}
func (r *runner) wipeDataStreamsOnSetup(ctx context.Context) error {
// Delete old data
logger.Debug("deleting old data in data stream...")
r.wipeDataStreamHandler = func(ctx context.Context) error {
logger.Debugf("deleting data in data stream...")
for _, runtimeDataStream := range r.runtimeDataStreams {
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
}
return nil
}
for _, runtimeDataStream := range r.runtimeDataStreams {
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
}
return nil
}
func (r *runner) installPackage(ctx context.Context) error {
return r.installPackageFromPackageRoot(ctx)
}
func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
logger.Debug("Installing package...")
installer, err := installer.NewForPackage(ctx, installer.Options{
Kibana: r.options.KibanaClient,
RootPath: r.options.PackageRootPath,
SkipValidation: true,
})
if err != nil {
return fmt.Errorf("failed to initialize package installer: %w", err)
}
_, err = installer.Install(ctx)
if err != nil {
return fmt.Errorf("failed to install package: %w", err)
}
r.removePackageHandler = func(ctx context.Context) error {
if err := installer.Uninstall(ctx); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}
return nil
}
return nil
}
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
// Unavailable index is ok, this means that data is already not there.
return nil
}
if resp.IsError() {
return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String())
}
return nil
}
func (r *runner) initializeGenerator(tpl []byte, config genlib.Config, fields genlib.Fields, scenario *scenario, backFill time.Duration, totEvents uint64) (genlib.Generator, error) {
timestampConfig := genlib.ConfigField{Name: r.options.TimestampField}
if backFill < 0 {
timestampConfig.Period = backFill
}
config.SetField(r.options.TimestampField, timestampConfig)
switch scenario.Corpora.Generator.Template.Type {
default:
logger.Debugf("unknown generator template type %q, defaulting to \"placeholder\"", scenario.Corpora.Generator.Template.Type)
fallthrough
case "", "placeholder":
return genlib.NewGeneratorWithCustomTemplate(tpl, config, fields, totEvents)
case "gotext":
return genlib.NewGeneratorWithTextTemplate(tpl, config, fields, totEvents)
}
}
func (r *runner) collectGenerators(ctx context.Context) error {
for scenarioName, scenario := range r.scenarios {
generator, backfillGenerator, err := r.createGenerator(ctx, scenarioName, scenario)
if err != nil {
return err
}
r.generators[scenarioName] = generator
if backfillGenerator != nil {
r.backFillGenerators[scenarioName] = backfillGenerator
}
}
return nil
}
func (r *runner) createGenerator(ctx context.Context, scenarioName string, scenario *scenario) (genlib.Generator, genlib.Generator, error) {
config, err := r.getGeneratorConfig(scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain generator config for scenario %q: %w", scenarioName, err)
}
fields, err := r.getGeneratorFields(ctx, scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain fields from generator for scenario %q: %w", scenarioName, err)
}
tpl, err := r.getGeneratorTemplate(scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain template from for scenario %q: %w", scenarioName, err)
}
genlib.InitGeneratorTimeNow(time.Now())
genlib.InitGeneratorRandSeed(time.Now().UnixNano())
generator, err := r.initializeGenerator(tpl, *config, fields, scenario, 0, 0)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize backfill generator for scenario %q: %w", scenarioName, err)
}
if r.options.BackFill >= 0 {
return generator, nil, nil
}
// backfill is a negative duration, make it positive, find how many periods in the backfill and multiply by events for periodk
totEvents := uint64((-1*r.options.BackFill)/r.options.PeriodDuration) * r.options.EventsPerPeriod
backFillGenerator, err := r.initializeGenerator(tpl, *config, fields, scenario, r.options.BackFill, totEvents)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize backfill generator for scenario %q: %w", scenarioName, err)
}
return generator, backFillGenerator, nil
}
func (r *runner) getGeneratorConfig(scenario *scenario) (*config.Config, error) {
var (
data []byte
err error
)
if scenario.Corpora.Generator.Config.Path != "" {
configPath := filepath.Clean(filepath.Join(devPath, scenario.Corpora.Generator.Config.Path))
configPath = os.ExpandEnv(configPath)
if _, err := os.Stat(configPath); err != nil {
return nil, fmt.Errorf("can't find config file %s: %w", configPath, err)
}
data, err = os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("can't read config file %s: %w", configPath, err)
}
} else if len(scenario.Corpora.Generator.Config.Raw) > 0 {
data, err = yaml.Marshal(scenario.Corpora.Generator.Config.Raw)
if err != nil {
return nil, fmt.Errorf("can't parse raw generator config: %w", err)
}
}
cfg, err := config.LoadConfigFromYaml(data)
if err != nil {
return nil, fmt.Errorf("can't get generator config: %w", err)
}
return &cfg, nil
}
func (r *runner) getGeneratorFields(ctx context.Context, scenario *scenario) (fields.Fields, error) {
var (
data []byte
err error
)
if scenario.Corpora.Generator.Fields.Path != "" {
fieldsPath := filepath.Clean(filepath.Join(devPath, scenario.Corpora.Generator.Fields.Path))
fieldsPath = os.ExpandEnv(fieldsPath)
if _, err := os.Stat(fieldsPath); err != nil {
return nil, fmt.Errorf("can't find fields file %s: %w", fieldsPath, err)
}
data, err = os.ReadFile(fieldsPath)
if err != nil {
return nil, fmt.Errorf("can't read fields file %s: %w", fieldsPath, err)
}
} else if len(scenario.Corpora.Generator.Fields.Raw) > 0 {
data, err = yaml.Marshal(scenario.Corpora.Generator.Fields.Raw)
if err != nil {
return nil, fmt.Errorf("can't parse raw generator fields: %w", err)
}
}
fields, err := fields.LoadFieldsWithTemplateFromString(ctx, string(data))
if err != nil {
return nil, fmt.Errorf("could not load fields yaml: %w", err)
}
return fields, nil
}
func (r *runner) getGeneratorTemplate(scenario *scenario) ([]byte, error) {
var (
data []byte
err error
)
if scenario.Corpora.Generator.Template.Path != "" {
tplPath := filepath.Clean(filepath.Join(devPath, scenario.Corpora.Generator.Template.Path))
tplPath = os.ExpandEnv(tplPath)
if _, err := os.Stat(tplPath); err != nil {
return nil, fmt.Errorf("can't find template file %s: %w", tplPath, err)
}
data, err = os.ReadFile(tplPath)
if err != nil {
return nil, fmt.Errorf("can't read template file %s: %w", tplPath, err)
}
} else if len(scenario.Corpora.Generator.Template.Raw) > 0 {
data = []byte(scenario.Corpora.Generator.Template.Raw)
}
return data, nil
}
func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *bytes.Buffer, generator genlib.Generator, bulkBodyBuilder strings.Builder) (strings.Builder, error) {
err := generator.Emit(buf)
if err != nil {
return bulkBodyBuilder, err
}
var event map[string]any
err = json.Unmarshal(buf.Bytes(), &event)
if err != nil {
logger.Debugf("Problem found when unmarshalling document: %s", buf.String())
return bulkBodyBuilder, fmt.Errorf("failed to unmarshal json event, check your benchmark template for scenario %s: %w", scenarioName, err)
}
enriched := r.enrichEventWithBenchmarkMetadata(event)
src, err := json.Marshal(enriched)
if err != nil {
return bulkBodyBuilder, err
}
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"create\":{\"_index\":\"%s\"}}\n", indexName))
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
buf.Reset()
return bulkBodyBuilder, nil
}
func (r *runner) performBulkRequest(ctx context.Context, bulkRequest string) error {
resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest),
r.options.ESAPI.Bulk.WithContext(ctx),
)
if err != nil {
return err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
type bodyErrors struct {
Errors bool `json:"errors"`
Items []any `json:"items"`
}
var errors bodyErrors
err = json.Unmarshal(body, &errors)
if err != nil {
return err
}
if errors.Errors {
logger.Debug("Error in Elasticsearch bulk request: %s", string(body))
return fmt.Errorf("%d failed", len(errors.Items))
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("%s", resp.String())
}
return nil
}
func (r *runner) run(ctx context.Context) error {
logger.Debug("streaming data...")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errC := make(chan error)
defer close(errC)
var wg sync.WaitGroup
defer wg.Wait()
for scenarioName := range r.generators {
wg.Add(1)
go func(scenarioName string) {
defer wg.Done()
err := r.runStreamGenerator(ctx, scenarioName)
if err != nil {
errC <- err
}
}(scenarioName)
}
for scenarioName := range r.backFillGenerators {
wg.Add(1)
go func(scenarioName string) {
defer wg.Done()
err := r.runBackfillGenerator(ctx, scenarioName)
if err != nil {
errC <- err
}
}(scenarioName)
}
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-errC:
cancel()
}
// Ensure no goroutine is blocked sending errors.
go func() {
for range errC {
}
}()
return err
}
func (r *runner) runStreamGenerator(ctx context.Context, scenarioName string) error {
generator := r.generators[scenarioName]
indexName := r.runtimeDataStreams[scenarioName]
ticker := time.NewTicker(r.options.PeriodDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
logger.Debugf("bulk request of %d events on %s...", r.options.EventsPerPeriod, indexName)
var bulkBodyBuilder strings.Builder
buf := bytes.NewBufferString("")
for i := uint64(0); i < r.options.EventsPerPeriod; i++ {
var err error
bulkBodyBuilder, err = r.collectBulkRequestBody(indexName, scenarioName, buf, generator, bulkBodyBuilder)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("error while generating event for streaming: %w", err)
}
}
err := r.performBulkRequest(ctx, bulkBodyBuilder.String())
if err != nil {
return fmt.Errorf("error performing bulk request: %w", err)
}
}
return nil
}
func (r *runner) runBackfillGenerator(ctx context.Context, scenarioName string) error {
var bulkBodyBuilder strings.Builder
generator := r.backFillGenerators[scenarioName]
indexName := r.runtimeDataStreams[scenarioName]
logger.Debugf("bulk request of %s backfill events on %s...", r.options.BackFill.String(), indexName)
buf := bytes.NewBufferString("")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var err error
bulkBodyBuilder, err = r.collectBulkRequestBody(indexName, scenarioName, buf, generator, bulkBodyBuilder)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("error while generating event for streaming: %w", err)
}
}
return r.performBulkRequest(ctx, bulkBodyBuilder.String())
}
type benchMeta struct {
Info struct {
Benchmark string `json:"benchmark"`
RunID string `json:"run_id"`
} `json:"info"`
}
func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]any) map[string]interface{} {
var m benchMeta
m.Info.Benchmark = r.options.BenchName
m.Info.RunID = r.svcInfo.Test.RunID
e["benchmark_metadata"] = m
return e
}