internal/benchrunner/runners/system/report.go (205 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package system import ( "encoding/json" "fmt" "strings" "time" "github.com/dustin/go-humanize" "github.com/jedib0t/go-pretty/table" "github.com/jedib0t/go-pretty/text" "github.com/elastic/elastic-package/internal/benchrunner/reporters" "github.com/elastic/elastic-package/internal/elasticsearch/ingest" ) type report struct { Info struct { Benchmark string Description string RunID string Package string StartTs int64 EndTs int64 Duration time.Duration GeneratedCorporaFile string } Parameters struct { PackageVersion string Input string Vars map[string]interface{} DataStream dataStream WarmupTimePeriod time.Duration BenchmarkTimePeriod time.Duration WaitForDataTimeout time.Duration Corpora corpora } ClusterName string Nodes int DataStreamStats *ingest.DataStreamStats IngestPipelineStats map[string]ingest.PipelineStatsMap DiskUsage map[string]ingest.DiskUsage TotalHits int } func createReport(benchName, corporaFile string, s *scenario, sum *metricsSummary) (reporters.Reportable, error) { r := newReport(benchName, corporaFile, s, sum) human := reporters.NewReport(s.Package, reportHumanFormat(r)) jsonBytes, err := reportJSONFormat(r) if err != nil { return nil, fmt.Errorf("rendering JSON report: %w", err) } jsonFile := reporters.NewFileReport(s.Package, fmt.Sprintf("system/%s/report.json", sum.RunID), jsonBytes) mr := reporters.NewMultiReport(s.Package, []reporters.Reportable{human, jsonFile}) return mr, nil } func newReport(benchName, corporaFile string, s *scenario, sum *metricsSummary) *report { var report report report.Info.Benchmark = benchName report.Info.Description = s.Description report.Info.RunID = sum.RunID report.Info.Package = s.Package report.Info.StartTs = sum.CollectionStartTs report.Info.EndTs = sum.CollectionEndTs report.Info.Duration = time.Duration(sum.CollectionEndTs-sum.CollectionStartTs) * time.Second report.Info.GeneratedCorporaFile = corporaFile report.Parameters.PackageVersion = s.Version report.Parameters.Input = s.Input report.Parameters.Vars = s.Vars report.Parameters.DataStream = s.DataStream report.Parameters.WarmupTimePeriod = s.WarmupTimePeriod report.Parameters.BenchmarkTimePeriod = s.BenchmarkTimePeriod report.Parameters.WaitForDataTimeout = *s.WaitForDataTimeout report.Parameters.Corpora = s.Corpora report.ClusterName = sum.ClusterName report.Nodes = sum.Nodes report.DataStreamStats = sum.DataStreamStats report.IngestPipelineStats = sum.IngestPipelineStats report.DiskUsage = sum.DiskUsage report.TotalHits = sum.TotalHits return &report } func reportJSONFormat(r *report) ([]byte, error) { b, err := json.MarshalIndent(r, "", "\t") if err != nil { return nil, err } return b, nil } func reportHumanFormat(r *report) []byte { var report strings.Builder report.WriteString(renderBenchmarkTable( "info", "benchmark", r.Info.Benchmark, "description", r.Info.Description, "run ID", r.Info.RunID, "package", r.Info.Package, "start ts (s)", r.Info.StartTs, "end ts (s)", r.Info.EndTs, "duration", r.Info.Duration, "generated corpora file", r.Info.GeneratedCorporaFile, ) + "\n") pkvs := []interface{}{ "package version", r.Parameters.PackageVersion, "input", r.Parameters.Input, } for k, v := range r.Parameters.Vars { pkvs = append(pkvs, fmt.Sprintf("vars.%s", k), v) } pkvs = append(pkvs, "data_stream.name", r.Parameters.DataStream.Name) for k, v := range r.Parameters.DataStream.Vars { pkvs = append(pkvs, fmt.Sprintf("data_stream.vars.%s", k), v) } pkvs = append(pkvs, "warmup time period", r.Parameters.WarmupTimePeriod, "benchmark time period", r.Parameters.BenchmarkTimePeriod, "wait for data timeout", r.Parameters.WaitForDataTimeout, ) if r.Parameters.Corpora.Generator != nil { pkvs = append(pkvs, "corpora.generator.total_events", r.Parameters.Corpora.Generator.TotalEvents, "corpora.generator.template.path", r.Parameters.Corpora.Generator.Template.Path, "corpora.generator.template.raw", r.Parameters.Corpora.Generator.Template.Raw, "corpora.generator.template.type", r.Parameters.Corpora.Generator.Template.Type, "corpora.generator.config.path", r.Parameters.Corpora.Generator.Config.Path, "corpora.generator.config.raw", r.Parameters.Corpora.Generator.Config.Raw, "corpora.generator.fields.path", r.Parameters.Corpora.Generator.Fields.Path, "corpora.generator.fields.raw", r.Parameters.Corpora.Generator.Fields.Raw, ) } if r.Parameters.Corpora.InputService != nil { pkvs = append(pkvs, "corpora.input_service.name", r.Parameters.Corpora.InputService.Name, "corpora.input_service.signal", r.Parameters.Corpora.InputService.Signal, ) } report.WriteString(renderBenchmarkTable("parameters", pkvs...) + "\n") report.WriteString(renderBenchmarkTable( "cluster info", "name", r.ClusterName, "nodes", r.Nodes, ) + "\n") report.WriteString(renderBenchmarkTable( "data stream stats", "data stream", r.DataStreamStats.DataStream, "approx total docs ingested", r.TotalHits, "backing indices", r.DataStreamStats.BackingIndices, "store size bytes", r.DataStreamStats.StoreSizeBytes, "maximum ts (ms)", r.DataStreamStats.MaximumTimestamp, ) + "\n") for index, du := range r.DiskUsage { adu := du.AllFields report.WriteString(renderBenchmarkTable( fmt.Sprintf("disk usage for index %s (for all fields)", index), "total", humanize.Bytes(adu.TotalInBytes), "inverted_index.total", humanize.Bytes(adu.InvertedIndex.TotalInBytes), "inverted_index.stored_fields", humanize.Bytes(adu.StoredFieldsInBytes), "inverted_index.doc_values", humanize.Bytes(adu.DocValuesInBytes), "inverted_index.points", humanize.Bytes(adu.PointsInBytes), "inverted_index.norms", humanize.Bytes(adu.NormsInBytes), "inverted_index.term_vectors", humanize.Bytes(adu.TermVectorsInBytes), "inverted_index.knn_vectors", humanize.Bytes(adu.KnnVectorsInBytes), ) + "\n") } for node, pStats := range r.IngestPipelineStats { for pipeline, stats := range pStats { if stats.Count == 0 { continue } kvs := []interface{}{ "Totals", fmt.Sprintf( "Count: %d | Failed: %d | Time: %s", stats.Count, stats.Failed, time.Duration(stats.TimeInMillis)*time.Millisecond, ), } for _, procStats := range stats.Processors { str := fmt.Sprintf( "Count: %d | Failed: %d | Time: %s", procStats.Stats.Count, procStats.Stats.Failed, time.Duration(procStats.Stats.TimeInMillis)*time.Millisecond, ) kvs = append(kvs, fmt.Sprintf("%s (%s)", procStats.Type, procStats.Extra), str) } report.WriteString(renderBenchmarkTable( fmt.Sprintf("pipeline %s stats in node %s", pipeline, node), kvs..., ) + "\n") } } return []byte(report.String()) } func renderBenchmarkTable(title string, kv ...interface{}) string { t := table.NewWriter() t.SetStyle(table.StyleRounded) t.SetTitle(title) t.SetColumnConfigs([]table.ColumnConfig{ { Number: 2, Align: text.AlignRight, }, }) for i := 0; i < len(kv)-1; i += 2 { t.AppendRow(table.Row{kv[i], kv[i+1]}) } return t.Render() }