internal/benchrunner/runners/system/runner.go (772 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package system
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/config"
"github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/fields"
"github.com/elastic/elastic-package/internal/benchrunner"
"github.com/elastic/elastic-package/internal/benchrunner/reporters"
"github.com/elastic/elastic-package/internal/benchrunner/runners/common"
"github.com/elastic/elastic-package/internal/configuration/locations"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/multierror"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/servicedeployer"
"github.com/elastic/elastic-package/internal/wait"
)
const (
// ServiceLogsAgentDir is folder path where log files produced by the service
// are stored on the Agent container's filesystem.
ServiceLogsAgentDir = "/tmp/service_logs"
// BenchType defining system benchmark
BenchType benchrunner.Type = "system"
)
type runner struct {
options Options
scenario *scenario
svcInfo servicedeployer.ServiceInfo
benchPolicy *kibana.Policy
runtimeDataStream string
pipelinePrefix string
generator genlib.Generator
mcollector *collector
corporaFile string
// Execution order of following handlers is defined in runner.TearDown() method.
deletePolicyHandler func(context.Context) error
resetAgentPolicyHandler func(context.Context) error
shutdownServiceHandler func(context.Context) error
wipeDataStreamHandler func(context.Context) error
clearCorporaHandler func(context.Context) error
}
func NewSystemBenchmark(opts Options) benchrunner.Runner {
return &runner{options: opts}
}
func (r *runner) SetUp(ctx context.Context) error {
return r.setUp(ctx)
}
// Run runs the system benchmarks defined under the given folder
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
return r.run(ctx)
}
func (r *runner) TearDown(ctx context.Context) error {
if r.options.DeferCleanup > 0 {
logger.Debugf("waiting for %s before tearing down...", r.options.DeferCleanup)
select {
case <-time.After(r.options.DeferCleanup):
case <-ctx.Done():
}
}
// Avoid cancellations during cleanup.
cleanupCtx := context.WithoutCancel(ctx)
var merr multierror.Error
if r.resetAgentPolicyHandler != nil {
if err := r.resetAgentPolicyHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.resetAgentPolicyHandler = nil
}
if r.deletePolicyHandler != nil {
if err := r.deletePolicyHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.deletePolicyHandler = nil
}
if r.shutdownServiceHandler != nil {
if err := r.shutdownServiceHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.shutdownServiceHandler = nil
}
if r.wipeDataStreamHandler != nil {
if err := r.wipeDataStreamHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.wipeDataStreamHandler = nil
}
if r.clearCorporaHandler != nil {
if err := r.clearCorporaHandler(cleanupCtx); err != nil {
merr = append(merr, err)
}
r.clearCorporaHandler = nil
}
if len(merr) == 0 {
return nil
}
return merr
}
func (r *runner) setUp(ctx context.Context) error {
locationManager, err := locations.NewLocationManager()
if err != nil {
return fmt.Errorf("reading service logs directory failed: %w", err)
}
serviceLogsDir := locationManager.ServiceLogDir()
r.svcInfo.Logs.Folder.Local = serviceLogsDir
r.svcInfo.Logs.Folder.Agent = ServiceLogsAgentDir
r.svcInfo.Test.RunID = common.NewRunID()
outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID)
if err != nil {
return fmt.Errorf("could not create output dir for terraform deployer %w", err)
}
r.svcInfo.OutputDir = outputDir
scenario, err := readConfig(r.options.BenchPath, r.options.BenchName, r.svcInfo)
if err != nil {
return err
}
r.scenario = scenario
if r.scenario.Corpora.Generator != nil {
var err error
r.generator, err = r.initializeGenerator(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}
}
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
policy, err := r.createBenchmarkPolicy(ctx, pkgManifest)
if err != nil {
return err
}
r.benchPolicy = policy
// Delete old data
logger.Debug("deleting old data in data stream...")
dataStreamManifest, err := packages.ReadDataStreamManifest(
filepath.Join(
common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name),
packages.DataStreamManifestFile,
),
)
if err != nil {
return fmt.Errorf("reading data stream manifest failed: %w", err)
}
r.runtimeDataStream = fmt.Sprintf(
"%s-%s.%s-%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
policy.Namespace,
)
r.pipelinePrefix = fmt.Sprintf(
"%s-%s.%s-%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
r.scenario.Version,
)
r.wipeDataStreamHandler = func(ctx context.Context) error {
logger.Debugf("deleting data in data stream...")
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
return nil
}
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
}
cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
return hits == 0, err
}, 5*time.Second, 2*time.Minute)
if err != nil || !cleared {
if err == nil {
err = errors.New("unable to clear previous data")
}
return err
}
return nil
}
func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) {
var service servicedeployer.DeployedService
if r.scenario.Corpora.InputService != nil {
s, err := r.setupService(ctx)
if errors.Is(err, os.ErrNotExist) {
logger.Debugf("No service deployer defined for this benchmark")
} else if err != nil {
return nil, err
}
service = s
}
r.startMetricsColletion(ctx)
defer r.mcollector.stop()
// if there is a generator config, generate the data
if r.generator != nil {
logger.Debugf("generating corpus data to %s...", r.svcInfo.Logs.Folder.Local)
if err := r.runGenerator(r.svcInfo.Logs.Folder.Local); err != nil {
return nil, fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err)
}
}
// once data is generated, enroll agents and assign policy
if err := r.enrollAgents(ctx); err != nil {
return nil, err
}
// Signal to the service that the agent is ready (policy is assigned).
if service != nil && r.scenario.Corpora.InputService != nil && r.scenario.Corpora.InputService.Signal != "" {
if err = service.Signal(ctx, r.scenario.Corpora.InputService.Signal); err != nil {
return nil, fmt.Errorf("failed to notify benchmark service: %w", err)
}
}
finishedOnTime, err := r.waitUntilBenchmarkFinishes(ctx)
if err != nil {
return nil, err
}
if !finishedOnTime {
return nil, errors.New("timeout exceeded")
}
msum, err := r.collectAndSummarizeMetrics()
if err != nil {
return nil, fmt.Errorf("can't summarize metrics: %w", err)
}
if err := r.reindexData(ctx); err != nil {
return nil, fmt.Errorf("can't reindex data: %w", err)
}
return createReport(r.options.BenchName, r.corporaFile, r.scenario, msum)
}
func (r *runner) setupService(ctx context.Context) (servicedeployer.DeployedService, error) {
stackVersion, err := r.options.KibanaClient.Version()
if err != nil {
return nil, fmt.Errorf("cannot request Kibana version: %w", err)
}
// Setup service.
logger.Debug("Setting up service...")
devDeployDir := filepath.Clean(filepath.Join(r.options.BenchPath, "deploy"))
opts := servicedeployer.FactoryOptions{
PackageRootPath: r.options.PackageRootPath,
DevDeployDir: devDeployDir,
Variant: r.options.Variant,
Profile: r.options.Profile,
Type: servicedeployer.TypeBench,
StackVersion: stackVersion.Version(),
DeployIndependentAgent: false,
}
serviceDeployer, err := servicedeployer.Factory(opts)
if err != nil {
return nil, fmt.Errorf("could not create service runner: %w", err)
}
r.svcInfo.Name = r.scenario.Corpora.InputService.Name
service, err := serviceDeployer.SetUp(ctx, r.svcInfo)
if err != nil {
return nil, fmt.Errorf("could not setup service: %w", err)
}
r.svcInfo = service.Info()
r.shutdownServiceHandler = func(ctx context.Context) error {
logger.Debug("tearing down service...")
if err := service.TearDown(ctx); err != nil {
return fmt.Errorf("error tearing down service: %w", err)
}
return nil
}
return service, nil
}
func (r *runner) startMetricsColletion(ctx context.Context) {
// TODO collect agent hosts metrics using system integration
r.mcollector = newCollector(
r.svcInfo,
r.options.BenchName,
*r.scenario,
r.options.ESAPI,
r.options.ESMetricsAPI,
r.options.MetricsInterval,
r.runtimeDataStream,
r.pipelinePrefix,
)
r.mcollector.start(ctx)
}
func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) {
r.mcollector.stop()
sum, err := r.mcollector.summarize()
return sum, err
}
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("failed to delete docs for data stream %s: %w", dataStream, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
// Unavailable index is ok, this means that data is already not there.
return nil
}
if resp.IsError() {
return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String())
}
return nil
}
func (r *runner) createBenchmarkPolicy(ctx context.Context, pkgManifest *packages.PackageManifest) (*kibana.Policy, error) {
// Configure package (single data stream) via Ingest Manager APIs.
logger.Debug("creating benchmark policy...")
benchTime := time.Now().Format("20060102T15:04:05Z")
p := kibana.Policy{
Name: fmt.Sprintf("ep-bench-%s-%s", r.options.BenchName, benchTime),
Description: fmt.Sprintf("policy created by elastic-package for benchmark %s", r.options.BenchName),
Namespace: "ep",
MonitoringEnabled: []string{"logs", "metrics"},
}
// Assign the data_output_id to the agent policy to configure the output to logstash. The value is inferred from stack/_static/kibana.yml.tmpl
if r.options.Profile.Config("stack.logstash_enabled", "false") == "true" {
p.DataOutputID = "fleet-logstash-output"
}
policy, err := r.options.KibanaClient.CreatePolicy(ctx, p)
if err != nil {
return nil, err
}
packagePolicy, err := r.createPackagePolicy(ctx, pkgManifest, policy)
if err != nil {
return nil, err
}
r.deletePolicyHandler = func(ctx context.Context) error {
var merr multierror.Error
logger.Debug("deleting benchmark package policy...")
if err := r.options.KibanaClient.DeletePackagePolicy(ctx, *packagePolicy); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark package policy: %w", err))
}
logger.Debug("deleting benchmark policy...")
if err := r.options.KibanaClient.DeletePolicy(ctx, policy.ID); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark policy: %w", err))
}
if len(merr) > 0 {
return merr
}
return nil
}
return policy, nil
}
func (r *runner) createPackagePolicy(ctx context.Context, pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) {
logger.Debug("creating package policy...")
if r.scenario.Version == "" {
r.scenario.Version = pkgManifest.Version
}
if r.scenario.Package == "" {
r.scenario.Package = pkgManifest.Name
}
if r.scenario.PolicyTemplate == "" {
r.scenario.PolicyTemplate = pkgManifest.PolicyTemplates[0].Name
}
pp := kibana.PackagePolicy{
Namespace: "ep",
PolicyID: p.ID,
Force: true,
Inputs: map[string]kibana.PackagePolicyInput{
fmt.Sprintf("%s-%s", r.scenario.PolicyTemplate, r.scenario.Input): {
Enabled: true,
Vars: r.scenario.Vars,
Streams: map[string]kibana.PackagePolicyStream{
fmt.Sprintf("%s.%s", pkgManifest.Name, r.scenario.DataStream.Name): {
Enabled: true,
Vars: r.scenario.DataStream.Vars,
},
},
},
},
}
pp.Package.Name = pkgManifest.Name
pp.Package.Version = r.scenario.Version
policy, err := r.options.KibanaClient.CreatePackagePolicy(ctx, pp)
if err != nil {
return nil, err
}
return policy, nil
}
func (r *runner) initializeGenerator(ctx context.Context) (genlib.Generator, error) {
totEvents := r.scenario.Corpora.Generator.TotalEvents
config, err := r.getGeneratorConfig()
if err != nil {
return nil, err
}
fields, err := r.getGeneratorFields(ctx)
if err != nil {
return nil, err
}
tpl, err := r.getGeneratorTemplate()
if err != nil {
return nil, err
}
genlib.InitGeneratorTimeNow(time.Now())
genlib.InitGeneratorRandSeed(time.Now().UnixNano())
var generator genlib.Generator
switch r.scenario.Corpora.Generator.Template.Type {
default:
logger.Debugf("unknown generator template type %q, defaulting to \"placeholder\"", r.scenario.Corpora.Generator.Template.Type)
fallthrough
case "", "placeholder":
generator, err = genlib.NewGeneratorWithCustomTemplate(tpl, *config, fields, totEvents)
case "gotext":
generator, err = genlib.NewGeneratorWithTextTemplate(tpl, *config, fields, totEvents)
}
if err != nil {
return nil, err
}
return generator, nil
}
func (r *runner) getGeneratorConfig() (*config.Config, error) {
var (
data []byte
err error
)
if r.scenario.Corpora.Generator.Config.Path != "" {
configPath := filepath.Clean(filepath.Join(r.options.BenchPath, r.scenario.Corpora.Generator.Config.Path))
configPath = os.ExpandEnv(configPath)
if _, err := os.Stat(configPath); err != nil {
return nil, fmt.Errorf("can't find config file %s: %w", configPath, err)
}
data, err = os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("can't read config file %s: %w", configPath, err)
}
} else if len(r.scenario.Corpora.Generator.Config.Raw) > 0 {
data, err = yaml.Marshal(r.scenario.Corpora.Generator.Config.Raw)
if err != nil {
return nil, fmt.Errorf("can't parse raw generator config: %w", err)
}
}
cfg, err := config.LoadConfigFromYaml(data)
if err != nil {
return nil, fmt.Errorf("can't get generator config: %w", err)
}
return &cfg, nil
}
func (r *runner) getGeneratorFields(ctx context.Context) (fields.Fields, error) {
var (
data []byte
err error
)
if r.scenario.Corpora.Generator.Fields.Path != "" {
fieldsPath := filepath.Clean(filepath.Join(r.options.BenchPath, r.scenario.Corpora.Generator.Fields.Path))
fieldsPath = os.ExpandEnv(fieldsPath)
if _, err := os.Stat(fieldsPath); err != nil {
return nil, fmt.Errorf("can't find fields file %s: %w", fieldsPath, err)
}
data, err = os.ReadFile(fieldsPath)
if err != nil {
return nil, fmt.Errorf("can't read fields file %s: %w", fieldsPath, err)
}
} else if len(r.scenario.Corpora.Generator.Fields.Raw) > 0 {
data, err = yaml.Marshal(r.scenario.Corpora.Generator.Config.Raw)
if err != nil {
return nil, fmt.Errorf("can't parse raw generator config: %w", err)
}
}
fields, err := fields.LoadFieldsWithTemplateFromString(ctx, string(data))
if err != nil {
return nil, fmt.Errorf("could not load fields yaml: %w", err)
}
return fields, nil
}
func (r *runner) getGeneratorTemplate() ([]byte, error) {
var (
data []byte
err error
)
if r.scenario.Corpora.Generator.Template.Path != "" {
tplPath := filepath.Clean(filepath.Join(r.options.BenchPath, r.scenario.Corpora.Generator.Template.Path))
tplPath = os.ExpandEnv(tplPath)
if _, err := os.Stat(tplPath); err != nil {
return nil, fmt.Errorf("can't find template file %s: %w", tplPath, err)
}
data, err = os.ReadFile(tplPath)
if err != nil {
return nil, fmt.Errorf("can't read template file %s: %w", tplPath, err)
}
} else if len(r.scenario.Corpora.Generator.Template.Raw) > 0 {
data = []byte(r.scenario.Corpora.Generator.Template.Raw)
}
return data, nil
}
func (r *runner) runGenerator(destDir string) error {
f, err := os.CreateTemp(destDir, "corpus-*")
if err != nil {
return err
}
defer f.Close()
if err := f.Chmod(os.ModePerm); err != nil {
return err
}
buf := bytes.NewBufferString("")
var corpusDocsCount uint64
for {
err := r.generator.Emit(buf)
if err == io.EOF {
break
}
if err != nil {
return err
}
// TODO: this should be taken care of by the corpus generator tool, once it will be done let's remove this
replacer := strings.NewReplacer("\n", "")
event := replacer.Replace(buf.String())
if _, err = f.Write([]byte(event)); err != nil {
return err
}
if _, err = f.Write([]byte("\n")); err != nil {
return err
}
buf.Reset()
corpusDocsCount += 1
}
r.corporaFile = f.Name()
r.clearCorporaHandler = func(ctx context.Context) error {
return os.Remove(r.corporaFile)
}
return r.generator.Close()
}
func (r *runner) checkEnrolledAgents(ctx context.Context) ([]kibana.Agent, error) {
var agents []kibana.Agent
enrolled, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
allAgents, err := r.options.KibanaClient.ListAgents(ctx)
if err != nil {
return false, fmt.Errorf("could not list agents: %w", err)
}
agents = filterAgents(allAgents)
if len(agents) == 0 {
return false, nil // selected agents are unavailable yet
}
return true, nil
}, 5*time.Second, 5*time.Minute)
if err != nil {
return nil, fmt.Errorf("agent enrollment failed: %w", err)
}
if !enrolled {
return nil, errors.New("no agent enrolled in time")
}
return agents, nil
}
func (r *runner) waitUntilBenchmarkFinishes(ctx context.Context) (bool, error) {
logger.Debug("checking for all data in data stream...")
var benchTime *time.Timer
if r.scenario.BenchmarkTimePeriod > 0 {
benchTime = time.NewTimer(r.scenario.BenchmarkTimePeriod)
}
oldHits := 0
return wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
var err error
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
if hits == 0 {
return false, err
}
ret := hits == oldHits
if hits != oldHits {
oldHits = hits
}
if benchTime != nil {
select {
case <-benchTime.C:
return true, err
default:
return false, err
}
}
return ret, err
}, 5*time.Second, *r.scenario.WaitForDataTimeout)
}
func (r *runner) enrollAgents(ctx context.Context) error {
agents, err := r.checkEnrolledAgents(ctx)
if err != nil {
return fmt.Errorf("can't check enrolled agents: %w", err)
}
handlers := make([]func(context.Context) error, len(agents))
for i, agent := range agents {
origPolicy := kibana.Policy{
ID: agent.PolicyID,
Revision: agent.PolicyRevision,
}
// Assign policy to agent
handlers[i] = func(ctx context.Context) error {
logger.Debug("reassigning original policy back to agent...")
if err := r.options.KibanaClient.AssignPolicyToAgent(ctx, agent, origPolicy); err != nil {
return fmt.Errorf("error reassigning original policy to agent %s: %w", agent.ID, err)
}
return nil
}
policyWithDataStream, err := r.options.KibanaClient.GetPolicy(ctx, r.benchPolicy.ID)
if err != nil {
return fmt.Errorf("could not read the policy with data stream: %w", err)
}
logger.Debug("assigning package data stream to agent...")
if err := r.options.KibanaClient.AssignPolicyToAgent(ctx, agent, *policyWithDataStream); err != nil {
return fmt.Errorf("could not assign policy to agent: %w", err)
}
}
r.resetAgentPolicyHandler = func(ctx context.Context) error {
var merr multierror.Error
for _, h := range handlers {
if err := h(ctx); err != nil {
merr = append(merr, err)
}
}
if len(merr) == 0 {
return nil
}
return merr
}
return nil
}
// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore
func (r *runner) reindexData(ctx context.Context) error {
if !r.options.ReindexData {
return nil
}
if r.options.ESMetricsAPI == nil {
return errors.New("the option to reindex data is set, but the metricstore was not initialized")
}
logger.Debug("starting reindexing of data...")
logger.Debug("getting original mappings...")
// Get the mapping from the source data stream
mappingRes, err := r.options.ESAPI.Indices.GetMapping(
r.options.ESAPI.Indices.GetMapping.WithContext(ctx),
r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream),
)
if err != nil {
return fmt.Errorf("error getting mapping: %w", err)
}
defer mappingRes.Body.Close()
if mappingRes.IsError() {
return fmt.Errorf("error getting mapping: %s", mappingRes)
}
body, err := io.ReadAll(mappingRes.Body)
if err != nil {
return fmt.Errorf("error reading mapping body: %w", err)
}
mappings := map[string]struct {
Mappings json.RawMessage
}{}
if err := json.Unmarshal(body, &mappings); err != nil {
return fmt.Errorf("error unmarshaling mappings: %w", err)
}
if len(mappings) != 1 {
return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings))
}
var mapping string
for _, v := range mappings {
mapping = string(v.Mappings)
}
reader := bytes.NewReader(
[]byte(fmt.Sprintf(`{
"settings": {"number_of_replicas":0},
"mappings": %s
}`, mapping)),
)
indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.svcInfo.Test.RunID)
logger.Debugf("creating %s index in metricstore...", indexName)
createRes, err := r.options.ESMetricsAPI.Indices.Create(
indexName,
r.options.ESMetricsAPI.Indices.Create.WithContext(ctx),
r.options.ESMetricsAPI.Indices.Create.WithBody(reader),
)
if err != nil {
return fmt.Errorf("could not create index: %w", err)
}
defer createRes.Body.Close()
if createRes.IsError() {
return errors.New("got a response error while creating index")
}
bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`)
logger.Debug("starting scrolling of events...")
resp, err := r.options.ESAPI.Search(
r.options.ESAPI.Search.WithContext(ctx),
r.options.ESAPI.Search.WithIndex(r.runtimeDataStream),
r.options.ESAPI.Search.WithBody(bodyReader),
r.options.ESAPI.Search.WithScroll(time.Minute),
r.options.ESAPI.Search.WithSize(10000),
)
if err != nil {
return fmt.Errorf("error executing search: %w", err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to search events in data stream %s: %s", r.runtimeDataStream, resp.String())
}
// Iterate through the search results using the Scroll API
for {
var sr searchResponse
if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil {
return fmt.Errorf("error decoding search response: %w", err)
}
if sr.Error != nil {
return fmt.Errorf("error searching for documents: %s", sr.Error.Reason)
}
if len(sr.Hits) == 0 {
break
}
err := r.bulkMetrics(ctx, indexName, sr)
if err != nil {
return err
}
}
logger.Debug("reindexing operation finished")
return nil
}
type searchResponse struct {
Error *struct {
Reason string `json:"reason"`
} `json:"error"`
ScrollID string `json:"_scroll_id"`
Hits []struct {
ID string `json:"_id"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
}
func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error {
var bulkBodyBuilder strings.Builder
for _, hit := range sr.Hits {
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
src, err := json.Marshal(enriched)
if err != nil {
return fmt.Errorf("error decoding _source: %w", err)
}
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
}
logger.Debugf("bulk request of %d events...", len(sr.Hits))
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()),
r.options.ESMetricsAPI.Bulk.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("error performing the bulk index request: %w", err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("error performing the bulk index request: %s", resp.String())
}
if sr.ScrollID == "" {
return errors.New("error getting scroll ID")
}
resp, err = r.options.ESAPI.Scroll(
r.options.ESAPI.Scroll.WithContext(ctx),
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
r.options.ESAPI.Scroll.WithScroll(time.Minute),
)
if err != nil {
return fmt.Errorf("error executing scroll: %s", err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("error executing scroll: %s", resp.String())
}
return nil
}
type benchMeta struct {
Info struct {
Benchmark string `json:"benchmark"`
RunID string `json:"run_id"`
} `json:"info"`
Parameters scenario `json:"parameter"`
}
func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} {
var m benchMeta
m.Info.Benchmark = r.options.BenchName
m.Info.RunID = r.svcInfo.Test.RunID
m.Parameters = *r.scenario
e["benchmark_metadata"] = m
return e
}
func filterAgents(allAgents []kibana.Agent) []kibana.Agent {
var filtered []kibana.Agent
for _, agent := range allAgents {
if agent.PolicyRevision == 0 {
// For some reason Kibana doesn't always return
// a valid policy revision (eventually it will be present and valid)
continue
}
// best effort to ignore fleet server agents
switch {
case agent.LocalMetadata.Host.Name == "docker-fleet-server",
agent.PolicyID == "fleet-server-policy",
agent.PolicyID == "policy-elastic-agent-on-cloud":
continue
}
filtered = append(filtered, agent)
}
return filtered
}