internal/benchrunner/runners/rally/runner.go (972 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package rally import ( "bufio" "bytes" "context" "encoding/csv" "encoding/json" "errors" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "strings" "text/template" "time" "github.com/magefile/mage/sh" "gopkg.in/yaml.v3" "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/config" "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/fields" "github.com/elastic/elastic-package/internal/benchrunner" "github.com/elastic/elastic-package/internal/benchrunner/reporters" "github.com/elastic/elastic-package/internal/benchrunner/runners/common" "github.com/elastic/elastic-package/internal/configuration/locations" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/packages/installer" "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/stack" "github.com/elastic/elastic-package/internal/wait" ) const ( // RallyCorpusAgentDir is folder path where rally corpora files produced by the service // are stored on the Rally container's filesystem. RallyCorpusAgentDir = "/tmp/rally_corpus" rallyTrackTemplateForTSDB = `{% import "rally.helpers" as rally with context %} { "version": 2, "description": "Track for [[.DataStream]]", "datastream": [ { "name": "[[.DataStream]]", "body": "[[.CorpusFilename]]" } ], "corpora": [ { "name": "[[.CorpusFilename]]", "documents": [ { "target-data-stream": "[[.DataStream]]", "source-file": "[[.CorpusFilename]]", "document-count": [[.CorpusDocsCount]], "uncompressed-bytes": [[.CorpusSizeInBytes]] } ] } ], "schedule": [ { "operation": { "operation-type": "create-composable-template", "template": "[[.ComposableTemplate]]", "body": [[.IndexTemplate]] }, "clients": 1 }, { "operation": { "operation-type": "bulk", "bulk-size": {{bulk_size | default(5000)}}, "ingest-percentage": {{ingest_percentage | default(100)}} }, "clients": {{bulk_indexing_clients | default(8)}} }, { "operation": { "operation-type": "delete-composable-template", "template": "[[.ComposableTemplate]]", "only-if-exists": true, "delete-matching-indices": false, "index_patterns": ["[[.IndexPattern]]"] }, "clients": 1 } ] }` rallyTrackTemplate = `{% import "rally.helpers" as rally with context %} { "version": 2, "description": "Track for [[.DataStream]]", "datastream": [ { "name": "[[.DataStream]]", "body": "[[.CorpusFilename]]" } ], "corpora": [ { "name": "[[.CorpusFilename]]", "documents": [ { "target-data-stream": "[[.DataStream]]", "source-file": "[[.CorpusFilename]]", "document-count": [[.CorpusDocsCount]], "uncompressed-bytes": [[.CorpusSizeInBytes]] } ] } ], "schedule": [ { "operation": { "operation-type": "bulk", "bulk-size": {{bulk_size | default(500)}}, "ingest-percentage": {{ingest_percentage | default(100)}} }, "clients": {{bulk_indexing_clients | default(1)}} } ] }` ) var ErrDryRun = errors.New("dry run: rally benchmark not executed") type rallyStat struct { Metric string Task string Value any Unit string } type runner struct { options Options scenario *scenario svcInfo servicedeployer.ServiceInfo runtimeDataStream string indexTemplateBody string pipelinePrefix string isTSDB bool generator genlib.Generator mcollector *collector corpusFile string trackFile string reportFile string // Execution order of following handlers is defined in runner.TearDown() method. persistRallyTrackHandler func(context.Context) error removePackageHandler func(context.Context) error wipeDataStreamHandler func(context.Context) error clearCorporaHandler func(context.Context) error clearTrackHandler func(context.Context) error } func NewRallyBenchmark(opts Options) benchrunner.Runner { return &runner{options: opts} } func (r *runner) SetUp(ctx context.Context) error { return r.setUp(ctx) } // Run runs the system benchmarks defined under the given folder func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) { return r.run(ctx) } func (r *runner) TearDown(ctx context.Context) error { if r.options.DeferCleanup > 0 { logger.Debugf("waiting for %s before tearing down...", r.options.DeferCleanup) select { case <-time.After(r.options.DeferCleanup): case <-ctx.Done(): } } // Avoid cancellations during cleanup. cleanupCtx := context.WithoutCancel(ctx) var merr multierror.Error if r.persistRallyTrackHandler != nil { if err := r.persistRallyTrackHandler(cleanupCtx); err != nil { merr = append(merr, err) } r.persistRallyTrackHandler = nil } if r.removePackageHandler != nil { if err := r.removePackageHandler(cleanupCtx); err != nil { merr = append(merr, err) } r.removePackageHandler = nil } if r.wipeDataStreamHandler != nil { if err := r.wipeDataStreamHandler(cleanupCtx); err != nil { merr = append(merr, err) } r.wipeDataStreamHandler = nil } if r.clearCorporaHandler != nil { if err := r.clearCorporaHandler(cleanupCtx); err != nil { merr = append(merr, err) } r.clearCorporaHandler = nil } if len(merr) == 0 { return nil } return merr } func (r *runner) createRallyTrackDir(locationManager *locations.LocationManager) error { outputDir := filepath.Join(locationManager.RallyCorpusDir(), r.svcInfo.Test.RunID) if err := os.MkdirAll(outputDir, 0755); err != nil { return fmt.Errorf("failed to create output directory: %w", err) } return nil } func (r *runner) setUp(ctx context.Context) error { locationManager, err := locations.NewLocationManager() if err != nil { return fmt.Errorf("reading service logs directory failed: %w", err) } rallyCorpusDir := locationManager.RallyCorpusDir() r.svcInfo.Logs.Folder.Local = rallyCorpusDir r.svcInfo.Logs.Folder.Agent = RallyCorpusAgentDir r.svcInfo.Test.RunID = common.NewRunID() outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID) if err != nil { return fmt.Errorf("could not create output dir for terraform deployer %w", err) } r.svcInfo.OutputDir = outputDir err = r.createRallyTrackDir(locationManager) if err != nil { return fmt.Errorf("could not create local rally track dir %w", err) } pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath) if err != nil { return fmt.Errorf("reading package manifest failed: %w", err) } scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, pkgManifest.Name, pkgManifest.Version) if err != nil { return err } r.scenario = scenario if err = r.installPackage(ctx); err != nil { return fmt.Errorf("error installing package: %w", err) } if r.scenario.Corpora.Generator != nil && len(r.options.CorpusAtPath) == 0 { var err error r.generator, err = r.initializeGenerator(ctx) if err != nil { return fmt.Errorf("can't initialize generator: %w", err) } } dataStreamManifest, err := packages.ReadDataStreamManifest( filepath.Join( common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name), packages.DataStreamManifestFile, ), ) if err != nil { return fmt.Errorf("reading data stream manifest failed: %w", err) } r.runtimeDataStream = fmt.Sprintf( "%s-%s.%s-ep", dataStreamManifest.Type, pkgManifest.Name, dataStreamManifest.Name, ) r.pipelinePrefix = fmt.Sprintf( "%s-%s.%s-%s", dataStreamManifest.Type, pkgManifest.Name, dataStreamManifest.Name, r.scenario.Version, ) if dataStreamManifest.Elasticsearch != nil { r.isTSDB = dataStreamManifest.Elasticsearch.IndexMode == "time_series" } if r.isTSDB { indexTemplate := fmt.Sprintf( "%s-%s.%s", dataStreamManifest.Type, pkgManifest.Name, dataStreamManifest.Name, ) r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate) if err != nil { return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err) } } if err := r.wipeDataStreamOnSetup(ctx); err != nil { return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) } cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream) return hits == 0, err }, 5*time.Second, 2*time.Minute) if err != nil || !cleared { if err == nil { err = errors.New("unable to clear previous data") } return err } return nil } func (r *runner) extractSimulatedTemplate(ctx context.Context, indexTemplate string) (string, error) { simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate( r.options.ESAPI.Indices.SimulateTemplate.WithContext(ctx), r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate), ) if err != nil { return "", fmt.Errorf("error simulating template from composable template: %s: %w", indexTemplate, err) } defer simulateTemplate.Body.Close() if simulateTemplate.IsError() { return "", fmt.Errorf("error simulating template from composable template: %s: %s", indexTemplate, simulateTemplate.String()) } templateBody, err := io.ReadAll(simulateTemplate.Body) if err != nil { return "", fmt.Errorf("error reading simulated template from composable template: %s: %w", indexTemplate, err) } var simulatedTemplate map[string]interface{} err = json.Unmarshal(templateBody, &simulatedTemplate) if err != nil { return "", fmt.Errorf("error unmarshaling simulated template from composable template: %s: %w", indexTemplate, err) } simulatedTemplate["priority"] = 1000 simulatedTemplate["index_patterns"] = []string{indexTemplate + "-ep"} indexTimeSeries := map[string]interface{}{ "start_time": "2000-01-01T00:00:00Z", "end_time": "2099-12-31T23:59:59Z", } simulatedTemplate["template"].(map[string]interface{})["settings"].(map[string]interface{})["index"].(map[string]interface{})["time_series"] = indexTimeSeries delete(simulatedTemplate, "overlapping") newTemplate, err := json.Marshal(simulatedTemplate) if err != nil { return "", fmt.Errorf("error marshaling simulated template from composable template: %s: %w", indexTemplate, err) } return string(newTemplate), nil } func (r *runner) wipeDataStreamOnSetup(ctx context.Context) error { // Delete old data logger.Debug("deleting old data in data stream...") r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } return nil } return r.deleteDataStreamDocs(ctx, r.runtimeDataStream) } func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) { r.startMetricsColletion(ctx) defer r.mcollector.stop() var corpusDocCount uint64 // if there is a generator config, generate the data, unless a corpus path is set if r.generator != nil && len(r.options.CorpusAtPath) == 0 { logger.Debugf("generating corpus data to %s...", r.svcInfo.Logs.Folder.Local) corpusDocCount, err = r.runGenerator(r.svcInfo.Logs.Folder.Local) if err != nil { return nil, fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err) } } if len(r.options.CorpusAtPath) > 0 { logger.Debugf("reading corpus data from %s...", r.options.CorpusAtPath) corpusDocCount, err = r.copyCorpusFile(r.options.CorpusAtPath, r.svcInfo.Logs.Folder.Local) if err != nil { return nil, fmt.Errorf("can't read benchmarks data corpus for data stream: %w", err) } } if corpusDocCount == 0 { return nil, errors.New("can't find documents in the corpus for data stream") } if err := r.createRallyTrack(corpusDocCount, r.svcInfo.Logs.Folder.Local); err != nil { return nil, fmt.Errorf("can't create benchmarks data rally track for data stream: %w", err) } if r.options.DryRun { dummy := reporters.NewReport(r.scenario.Package, nil) return dummy, ErrDryRun } rallyStats, err := r.runRally(ctx) if err != nil { return nil, err } msum, err := r.collectAndSummarizeMetrics() if err != nil { return nil, fmt.Errorf("can't summarize metrics: %w", err) } if err := r.reindexData(ctx); err != nil { return nil, fmt.Errorf("can't reindex data: %w", err) } return createReport(r.options.BenchName, r.corpusFile, r.scenario, msum, rallyStats) } func (r *runner) installPackage(ctx context.Context) error { if len(r.options.PackageVersion) > 0 { r.scenario.Package = r.options.PackageName r.scenario.Version = r.options.PackageVersion return r.installPackageFromRegistry(ctx, r.options.PackageName, r.options.PackageVersion) } return r.installPackageFromPackageRoot(ctx) } func (r *runner) installPackageFromRegistry(ctx context.Context, packageName, packageVersion string) error { // POST /epm/packages/{pkgName}/{pkgVersion} // Configure package (single data stream) via Ingest Manager APIs. logger.Debug("installing package...") _, err := r.options.KibanaClient.InstallPackage(ctx, packageName, packageVersion) if err != nil { return fmt.Errorf("cannot install package %s@%s: %w", packageName, packageVersion, err) } r.removePackageHandler = func(ctx context.Context) error { logger.Debug("removing benchmark package...") if _, err := r.options.KibanaClient.RemovePackage(ctx, packageName, packageVersion); err != nil { return fmt.Errorf("error removing benchmark package: %w", err) } return nil } return nil } func (r *runner) installPackageFromPackageRoot(ctx context.Context) error { logger.Debug("Installing package...") installer, err := installer.NewForPackage(ctx, installer.Options{ Kibana: r.options.KibanaClient, RootPath: r.options.PackageRootPath, SkipValidation: true, }) if err != nil { return fmt.Errorf("failed to initialize package installer: %w", err) } _, err = installer.Install(ctx) if err != nil { return fmt.Errorf("failed to install package: %w", err) } r.removePackageHandler = func(ctx context.Context) error { if err := installer.Uninstall(ctx); err != nil { return fmt.Errorf("error removing benchmark package: %w", err) } return nil } return nil } func (r *runner) startMetricsColletion(ctx context.Context) { // TODO collect agent hosts metrics using system integration r.mcollector = newCollector( r.svcInfo, r.options.BenchName, *r.scenario, r.options.ESAPI, r.options.ESMetricsAPI, r.options.MetricsInterval, r.runtimeDataStream, r.pipelinePrefix, ) r.mcollector.start(ctx) } func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { r.mcollector.stop() sum, err := r.mcollector.summarize() return sum, err } func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, r.options.ESAPI.DeleteByQuery.WithContext(ctx), ) if err != nil { return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { // Unavailable index is ok, this means that data is already not there. return nil } if resp.IsError() { return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) } return nil } func (r *runner) initializeGenerator(ctx context.Context) (genlib.Generator, error) { totEvents := r.scenario.Corpora.Generator.TotalEvents config, err := r.getGeneratorConfig() if err != nil { return nil, err } fields, err := r.getGeneratorFields(ctx) if err != nil { return nil, err } tpl, err := r.getGeneratorTemplate() if err != nil { return nil, err } genlib.InitGeneratorTimeNow(time.Now()) genlib.InitGeneratorRandSeed(time.Now().UnixNano()) var generator genlib.Generator switch r.scenario.Corpora.Generator.Template.Type { default: logger.Debugf("unknown generator template type %q, defaulting to \"placeholder\"", r.scenario.Corpora.Generator.Template.Type) fallthrough case "", "placeholder": generator, err = genlib.NewGeneratorWithCustomTemplate(tpl, *config, fields, totEvents) case "gotext": generator, err = genlib.NewGeneratorWithTextTemplate(tpl, *config, fields, totEvents) } if err != nil { return nil, err } return generator, nil } func (r *runner) getGeneratorConfig() (*config.Config, error) { var ( data []byte err error ) if r.scenario.Corpora.Generator.Config.Path != "" { configPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Config.Path)) configPath = os.ExpandEnv(configPath) if _, err := os.Stat(configPath); err != nil { return nil, fmt.Errorf("can't find config file %s: %w", configPath, err) } data, err = os.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("can't read config file %s: %w", configPath, err) } } else if len(r.scenario.Corpora.Generator.Config.Raw) > 0 { data, err = yaml.Marshal(r.scenario.Corpora.Generator.Config.Raw) if err != nil { return nil, fmt.Errorf("can't parse raw generator config: %w", err) } } cfg, err := config.LoadConfigFromYaml(data) if err != nil { return nil, fmt.Errorf("can't get generator config: %w", err) } return &cfg, nil } func (r *runner) getGeneratorFields(ctx context.Context) (fields.Fields, error) { var ( data []byte err error ) if r.scenario.Corpora.Generator.Fields.Path != "" { fieldsPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Fields.Path)) fieldsPath = os.ExpandEnv(fieldsPath) if _, err := os.Stat(fieldsPath); err != nil { return nil, fmt.Errorf("can't find fields file %s: %w", fieldsPath, err) } data, err = os.ReadFile(fieldsPath) if err != nil { return nil, fmt.Errorf("can't read fields file %s: %w", fieldsPath, err) } } else if len(r.scenario.Corpora.Generator.Fields.Raw) > 0 { data, err = yaml.Marshal(r.scenario.Corpora.Generator.Fields.Raw) if err != nil { return nil, fmt.Errorf("can't parse raw generator fields: %w", err) } } fields, err := fields.LoadFieldsWithTemplateFromString(ctx, string(data)) if err != nil { return nil, fmt.Errorf("could not load fields yaml: %w", err) } return fields, nil } func (r *runner) getGeneratorTemplate() ([]byte, error) { var ( data []byte err error ) if r.scenario.Corpora.Generator.Template.Path != "" { tplPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Template.Path)) tplPath = os.ExpandEnv(tplPath) if _, err := os.Stat(tplPath); err != nil { return nil, fmt.Errorf("can't find template file %s: %w", tplPath, err) } data, err = os.ReadFile(tplPath) if err != nil { return nil, fmt.Errorf("can't read template file %s: %w", tplPath, err) } } else if len(r.scenario.Corpora.Generator.Template.Raw) > 0 { data = []byte(r.scenario.Corpora.Generator.Template.Raw) } return data, nil } func (r *runner) runGenerator(destDir string) (uint64, error) { corpusFile, err := os.CreateTemp(destDir, "corpus-*") if err != nil { return 0, fmt.Errorf("cannot not create rally corpus file: %w", err) } defer corpusFile.Close() if err := corpusFile.Chmod(os.ModePerm); err != nil { return 0, fmt.Errorf("cannot not set permission to rally corpus file: %w", err) } buf := bytes.NewBufferString("") var corpusDocsCount uint64 for { err := r.generator.Emit(buf) if err == io.EOF { break } if err != nil { return 0, fmt.Errorf("error while generating content for the rally corpus file: %w", err) } // TODO: this should be taken care of by the corpus generator tool, once it will be done let's remove this event := strings.Replace(buf.String(), "\n", "", -1) if _, err = corpusFile.Write([]byte(event)); err != nil { return 0, fmt.Errorf("error while saving content to the rally corpus file: %w", err) } if _, err = corpusFile.Write([]byte("\n")); err != nil { return 0, fmt.Errorf("error while saving newline to the rally corpus file: %w", err) } buf.Reset() corpusDocsCount += 1 } r.corpusFile = corpusFile.Name() r.clearCorporaHandler = func(ctx context.Context) error { return errors.Join( os.Remove(r.corpusFile), ) } return corpusDocsCount, r.generator.Close() } // This seems to be the most performing way to calculate number of lines from an `io.Reader` (see: https://stackoverflow.com/a/52153000) func countLine(r io.Reader) (uint64, error) { var count uint64 const lineBreak = '\n' buf := make([]byte, bufio.MaxScanTokenSize) for { bufferSize, err := r.Read(buf) if err != nil && err != io.EOF { return 0, err } var buffPosition int for { i := bytes.IndexByte(buf[buffPosition:], lineBreak) if i == -1 || bufferSize == buffPosition { break } buffPosition += i + 1 count++ } if err == io.EOF { break } } return count, nil } func (r *runner) createRallyTrack(corpusDocsCount uint64, destDir string) error { trackFile, err := os.CreateTemp(destDir, "track-*.json") if err != nil { return fmt.Errorf("cannot not create rally track file: %w", err) } r.trackFile = trackFile.Name() rallyTrackContent, err := generateRallyTrack(r.runtimeDataStream, r.indexTemplateBody, r.corpusFile, corpusDocsCount, r.isTSDB) if err != nil { return fmt.Errorf("cannot not generate rally track content: %w", err) } err = os.WriteFile(r.trackFile, rallyTrackContent, os.ModePerm) if err != nil { return fmt.Errorf("cannot not save rally track content to file: %w", err) } defer trackFile.Close() reportFile, err := os.CreateTemp(destDir, "report-*.csv") if err != nil { return fmt.Errorf("cannot not save rally report file: %w", err) } defer reportFile.Close() r.reportFile = reportFile.Name() if r.options.RallyTrackOutputDir != "" { r.persistRallyTrackHandler = func(ctx context.Context) error { err := os.MkdirAll(r.options.RallyTrackOutputDir, 0755) if err != nil { return fmt.Errorf("cannot not create rally track output dir: %w", err) } persistedRallyTrack := filepath.Join(r.options.RallyTrackOutputDir, fmt.Sprintf("track-%s.json", r.runtimeDataStream)) err = sh.Copy(persistedRallyTrack, trackFile.Name()) if err != nil { return fmt.Errorf("cannot not copy rally track to file in output dir: %w", err) } persistedCorpus := filepath.Join(r.options.RallyTrackOutputDir, filepath.Base(r.corpusFile)) err = sh.Copy(persistedCorpus, r.corpusFile) if err != nil { err = fmt.Errorf("cannot not copy rally corpus to file in output dir: %w", err) return errors.Join(os.Remove(persistedRallyTrack), err) } logger.Infof("rally track and corpus saved at: %s", r.options.RallyTrackOutputDir) return nil } } r.clearTrackHandler = func(ctx context.Context) error { return errors.Join( os.Remove(r.trackFile), os.Remove(r.reportFile), ) } return nil } func (r *runner) copyCorpusFile(corpusPath, destDir string) (uint64, error) { corpusFile, err := os.CreateTemp(destDir, "corpus-*") if err != nil { return 0, fmt.Errorf("cannot not create rally corpus file: %w", err) } defer corpusFile.Close() if err := corpusFile.Chmod(os.ModePerm); err != nil { return 0, fmt.Errorf("cannot not set permission to rally corpus file: %w", err) } existingCorpus, err := os.Open(corpusPath) if err != nil { return 0, fmt.Errorf("error while reading content for the existing rally corpus file: %w", err) } defer existingCorpus.Close() corpusDocsCount, err := countLine(existingCorpus) if err != nil { return 0, fmt.Errorf("error while counting docs for the existing rally corpus file: %w", err) } offset, err := existingCorpus.Seek(0, io.SeekStart) if err != nil { return 0, fmt.Errorf("error while resetting content for the existing rally corpus file: %w", err) } if offset != 0 { return 0, errors.New("error while resetting content for the existing rally corpus file") } _, err = io.Copy(corpusFile, existingCorpus) if err != nil { return 0, fmt.Errorf("error while coping content for the existing rally corpus file: %w", err) } r.corpusFile = corpusFile.Name() r.clearCorporaHandler = func(ctx context.Context) error { return errors.Join( os.Remove(r.corpusFile), ) } return corpusDocsCount, nil } func (r *runner) runRally(ctx context.Context) ([]rallyStat, error) { logger.Debug("running rally...") profileConfig, err := stack.StackInitConfig(r.options.Profile) if err != nil { return nil, fmt.Errorf("failed to load config from profile: %w", err) } elasticsearchHost, found := os.LookupEnv(stack.ElasticsearchHostEnv) if !found { status, err := stack.Status(ctx, stack.Options{Profile: r.options.Profile}) if err != nil { return nil, fmt.Errorf("failed to check status of stack in current profile: %w", err) } if len(status) == 0 { return nil, stack.ErrUnavailableStack } elasticsearchHost = profileConfig.ElasticsearchHostPort logger.Debugf("Configuring rally with Elasticsearch host from current profile (profile: %s, host: %q)", r.options.Profile.ProfileName, elasticsearchHost) } elasticsearchPassword, found := os.LookupEnv(stack.ElasticsearchPasswordEnv) if !found { elasticsearchPassword = profileConfig.ElasticsearchPassword } elasticsearchUsername, found := os.LookupEnv(stack.ElasticsearchUsernameEnv) if !found { elasticsearchUsername = profileConfig.ElasticsearchUsername } _, err = exec.LookPath("esrally") if err != nil { return nil, errors.New("could not run esrally track in path: esrally not found, please follow instruction at https://esrally.readthedocs.io/en/stable/install.html") } cmd := exec.Command( "esrally", "race", "--race-id="+r.svcInfo.Test.RunID, "--report-format=csv", fmt.Sprintf(`--report-file=%s`, r.reportFile), fmt.Sprintf(`--target-hosts={"default":["%s"]}`, elasticsearchHost), fmt.Sprintf(`--track-path=%s`, r.trackFile), fmt.Sprintf(`--client-options={"default":{"basic_auth_user":"%s","basic_auth_password":"%s","use_ssl":true,"verify_certs":false}}`, elasticsearchUsername, elasticsearchPassword), "--pipeline=benchmark-only", "--kill-running-processes", ) errOutput := new(bytes.Buffer) cmd.Stderr = errOutput logger.Debugf("output command: %s", cmd) output, err := cmd.Output() if err != nil { return nil, fmt.Errorf("could not run esrally track in path: %s (stdout=%q, stderr=%q): %w", r.svcInfo.Logs.Folder.Local, output, errOutput.String(), err) } reportCSV, err := os.Open(r.reportFile) if err != nil { return nil, fmt.Errorf("could not open esrally report in path: %s: %w", r.svcInfo.Logs.Folder.Local, err) } reader := csv.NewReader(reportCSV) stats := make([]rallyStat, 0) for { record, err := reader.Read() if err == io.EOF { break } if err != nil { return nil, fmt.Errorf("could not read esrally report in path: %s (stderr=%q): %w", r.svcInfo.Logs.Folder.Local, errOutput.String(), err) } stats = append(stats, rallyStat{Metric: record[0], Task: record[1], Value: record[2], Unit: record[3]}) } return stats, nil } // reindexData will read all data generated during the benchmark and will reindex it to the metrisctore func (r *runner) reindexData(ctx context.Context) error { if !r.options.ReindexData { return nil } if r.options.ESMetricsAPI == nil { return errors.New("the option to reindex data is set, but the metricstore was not initialized") } logger.Debug("starting reindexing of data...") logger.Debug("getting original mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( r.options.ESAPI.Indices.GetMapping.WithContext(ctx), r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), ) if err != nil { return fmt.Errorf("error getting mapping: %w", err) } defer mappingRes.Body.Close() if mappingRes.IsError() { return fmt.Errorf("error getting mapping: %s", mappingRes) } body, err := io.ReadAll(mappingRes.Body) if err != nil { return fmt.Errorf("error reading mapping body: %w", err) } mappings := map[string]struct { Mappings json.RawMessage }{} if err := json.Unmarshal(body, &mappings); err != nil { return fmt.Errorf("error unmarshaling mappings: %w", err) } if len(mappings) != 1 { return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings)) } var mapping string for _, v := range mappings { mapping = string(v.Mappings) } reader := bytes.NewReader( []byte(fmt.Sprintf(`{ "settings": {"number_of_replicas":0}, "mappings": %s }`, mapping)), ) indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.svcInfo.Test.RunID) logger.Debugf("creating %s index in metricstore...", indexName) createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), r.options.ESMetricsAPI.Indices.Create.WithBody(reader), ) if err != nil { return fmt.Errorf("could not create index: %w", err) } defer createRes.Body.Close() if createRes.IsError() { return fmt.Errorf("got a response error while creating index: %s", createRes) } bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) logger.Debug("starting scrolling of events...") res, err := r.options.ESAPI.Search( r.options.ESAPI.Search.WithContext(ctx), r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), r.options.ESAPI.Search.WithSize(10000), ) if err != nil { return fmt.Errorf("error executing search: %w", err) } defer res.Body.Close() if res.IsError() { return fmt.Errorf("error executing search: %s", res) } // Iterate through the search results using the Scroll API for { var sr searchResponse if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } if sr.Error != nil { return fmt.Errorf("error searching for documents: %s", sr.Error.Reason) } if len(sr.Hits) == 0 { break } err := r.bulkMetrics(ctx, indexName, sr) if err != nil { return err } } logger.Debug("reindexing operation finished") return nil } type searchResponse struct { Error *struct { Reason string `json:"reason"` } `json:"error"` ScrollID string `json:"_scroll_id"` Hits []struct { ID string `json:"_id"` Source map[string]interface{} `json:"_source"` } `json:"hits"` } func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error { var bulkBodyBuilder strings.Builder for _, hit := range sr.Hits { bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) src, err := json.Marshal(enriched) if err != nil { return fmt.Errorf("error decoding _source: %w", err) } bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) } logger.Debugf("bulk request of %d events...", len(sr.Hits)) resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()), r.options.ESMetricsAPI.Bulk.WithContext(ctx), ) if err != nil { return fmt.Errorf("error performing the bulk index request: %w", err) } defer resp.Body.Close() if resp.IsError() { return fmt.Errorf("error performing the bulk index request: %s", resp.String()) } if sr.ScrollID == "" { return errors.New("error getting scroll ID") } resp, err = r.options.ESAPI.Scroll( r.options.ESAPI.Scroll.WithContext(ctx), r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), ) if err != nil { return fmt.Errorf("error executing scroll: %s", err) } defer resp.Body.Close() if resp.IsError() { return fmt.Errorf("error executing scroll: %s", resp.String()) } return nil } type benchMeta struct { Info struct { Benchmark string `json:"benchmark"` RunID string `json:"run_id"` } `json:"info"` Parameters scenario `json:"parameter"` } func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} { var m benchMeta m.Info.Benchmark = r.options.BenchName m.Info.RunID = r.svcInfo.Test.RunID m.Parameters = *r.scenario e["benchmark_metadata"] = m return e } func generateRallyTrack(dataStream, indexTemplateBody, corpusFileName string, corpusDocsCount uint64, isTSDB bool) ([]byte, error) { t := template.New("rallytrack") templateToParse := rallyTrackTemplate if isTSDB { templateToParse = rallyTrackTemplateForTSDB } parsedTpl, err := t.Delims("[[", "]]").Parse(templateToParse) if err != nil { return nil, fmt.Errorf("error while parsing rally track template: %w", err) } corpusFile, err := os.Open(corpusFileName) if err != nil { return nil, fmt.Errorf("error while opening corpus file for rally track template: %w", err) } defer corpusFile.Close() fi, err := corpusFile.Stat() if err != nil { return nil, fmt.Errorf("error with stat on rally corpus file: %w", err) } corpusSizeInBytes := fi.Size() templateData := map[string]any{ "DataStream": dataStream, "CorpusFilename": filepath.Base(corpusFile.Name()), "CorpusDocsCount": corpusDocsCount, "CorpusSizeInBytes": corpusSizeInBytes, "ComposableTemplate": dataStream, "IndexPattern": dataStream, "IndexTemplate": indexTemplateBody, } buf := new(bytes.Buffer) err = parsedTpl.Execute(buf, templateData) if err != nil { return nil, fmt.Errorf("error on executing rally track template: %w", err) } return buf.Bytes(), nil }