internal/benchrunner/runners/system/metrics.go (323 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package system
import (
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/elastic/elastic-package/internal/benchrunner/runners/common"
"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/servicedeployer"
)
type collector struct {
svcInfo servicedeployer.ServiceInfo
metadata benchMeta
scenario scenario
interval time.Duration
esAPI *elasticsearch.API
metricsAPI *elasticsearch.API
datastream string
pipelinePrefix string
wg sync.WaitGroup
stopped atomic.Bool
stopC chan struct{}
tick *time.Ticker
startIngestMetrics map[string]ingest.PipelineStatsMap
endIngestMetrics map[string]ingest.PipelineStatsMap
startMetrics metrics
endMetrics metrics
diskUsage map[string]ingest.DiskUsage
startTotalHits int
endTotalHits int
}
type metrics struct {
ts int64
dsMetrics *ingest.DataStreamStats
nMetrics *ingest.NodesStats
}
type metricsSummary struct {
ClusterName string
Nodes int
RunID string
CollectionStartTs int64
CollectionEndTs int64
DataStreamStats *ingest.DataStreamStats
IngestPipelineStats map[string]ingest.PipelineStatsMap
DiskUsage map[string]ingest.DiskUsage
TotalHits int
NodesStats map[string]ingest.NodeStats
}
func newCollector(
svcInfo servicedeployer.ServiceInfo,
benchName string,
scenario scenario,
esAPI, metricsAPI *elasticsearch.API,
interval time.Duration,
datastream, pipelinePrefix string,
) *collector {
meta := benchMeta{Parameters: scenario}
meta.Info.Benchmark = benchName
meta.Info.RunID = svcInfo.Test.RunID
return &collector{
svcInfo: svcInfo,
interval: interval,
scenario: scenario,
metadata: meta,
esAPI: esAPI,
metricsAPI: metricsAPI,
datastream: datastream,
pipelinePrefix: pipelinePrefix,
stopC: make(chan struct{}),
}
}
func (c *collector) start(ctx context.Context) {
c.tick = time.NewTicker(c.interval)
c.createMetricsIndex()
var once sync.Once
c.wg.Add(1)
go func() {
defer c.tick.Stop()
defer c.wg.Done()
for {
select {
case <-c.stopC:
// last collect before stopping
c.collectMetricsPreviousToStop(ctx)
c.publish(c.createEventsFromMetrics(c.endMetrics))
return
case <-c.tick.C:
once.Do(func() {
c.waitUntilReady()
c.startIngestMetrics = c.collectIngestMetrics()
c.startTotalHits = c.collectTotalHits(ctx)
c.startMetrics = c.collect()
c.publish(c.createEventsFromMetrics(c.startMetrics))
})
m := c.collect()
c.publish(c.createEventsFromMetrics(m))
}
}
}()
}
func (c *collector) stop() {
if !c.stopped.CompareAndSwap(false, true) {
return
}
close(c.stopC)
c.wg.Wait()
}
func (c *collector) collect() metrics {
m := metrics{
ts: time.Now().Unix(),
}
nstats, err := ingest.GetNodesStats(c.esAPI)
if err != nil {
logger.Debug(err)
} else {
m.nMetrics = nstats
}
dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream)
if err != nil {
logger.Debug(err)
} else {
m.dsMetrics = dsstats
}
return m
}
func (c *collector) publish(events [][]byte) {
if c.metricsAPI == nil {
return
}
for _, e := range events {
reqBody := bytes.NewReader(e)
resp, err := c.metricsAPI.Index(c.indexName(), reqBody)
if err != nil {
logger.Debugf("error indexing event: %v", err)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read index response body: %v", err)
}
resp.Body.Close()
if resp.StatusCode != 201 {
logger.Errorf("error indexing event (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body))
}
}
}
//go:embed metrics_index.json
var metricsIndexBytes []byte
func (c *collector) createMetricsIndex() {
if c.metricsAPI == nil {
return
}
reader := bytes.NewReader(metricsIndexBytes)
logger.Debugf("creating %s index in metricstore...", c.indexName())
createRes, err := c.metricsAPI.Indices.Create(
c.indexName(),
c.metricsAPI.Indices.Create.WithBody(reader),
)
if err != nil {
logger.Debugf("could not create index: %v", err)
return
}
createRes.Body.Close()
if createRes.IsError() {
logger.Debug("got a response error while creating index: %s", createRes)
}
}
func (c *collector) indexName() string {
return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.svcInfo.Test.RunID)
}
func (c *collector) summarize() (*metricsSummary, error) {
sum := metricsSummary{
RunID: c.svcInfo.Test.RunID,
IngestPipelineStats: make(map[string]ingest.PipelineStatsMap),
NodesStats: make(map[string]ingest.NodeStats),
DiskUsage: c.diskUsage,
TotalHits: c.endTotalHits - c.startTotalHits,
}
sum.ClusterName = c.startMetrics.nMetrics.ClusterName
sum.CollectionStartTs = c.startMetrics.ts
sum.CollectionEndTs = c.endMetrics.ts
sum.DataStreamStats = c.endMetrics.dsMetrics
sum.Nodes = len(c.endMetrics.nMetrics.Nodes)
for node, endPStats := range c.endIngestMetrics {
startPStats, found := c.startIngestMetrics[node]
if !found {
logger.Debugf("node %s not found in initial metrics", node)
continue
}
sumStats := make(ingest.PipelineStatsMap)
for pname, endStats := range endPStats {
startStats, found := startPStats[pname]
if !found {
logger.Debugf("pipeline %s not found in node %s initial metrics", pname, node)
continue
}
sumStats[pname] = ingest.PipelineStats{
StatsRecord: ingest.StatsRecord{
Count: endStats.Count - startStats.Count,
Failed: endStats.Failed - startStats.Failed,
TimeInMillis: endStats.TimeInMillis - startStats.TimeInMillis,
},
Processors: make([]ingest.ProcessorStats, len(endStats.Processors)),
}
for i, endPr := range endStats.Processors {
startPr := startStats.Processors[i]
sumStats[pname].Processors[i] = ingest.ProcessorStats{
Type: endPr.Type,
Extra: endPr.Extra,
Conditional: endPr.Conditional,
Stats: ingest.StatsRecord{
Count: endPr.Stats.Count - startPr.Stats.Count,
Failed: endPr.Stats.Failed - startPr.Stats.Failed,
TimeInMillis: endPr.Stats.TimeInMillis - startPr.Stats.TimeInMillis,
},
}
}
}
sum.IngestPipelineStats[node] = sumStats
}
return &sum, nil
}
func (c *collector) waitUntilReady() {
logger.Debug("waiting for datastream to be created...")
waitTick := time.NewTicker(time.Second)
defer waitTick.Stop()
readyLoop:
for {
select {
case <-c.stopC:
return
case <-waitTick.C:
}
dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream)
if err != nil {
logger.Debug(err)
}
if dsstats != nil {
break readyLoop
}
}
if c.scenario.WarmupTimePeriod > 0 {
logger.Debugf("waiting %s for warmup period", c.scenario.WarmupTimePeriod)
select {
case <-c.stopC:
return
case <-time.After(c.scenario.WarmupTimePeriod):
}
}
logger.Debug("metric collection starting...")
}
func (c *collector) collectIngestMetrics() map[string]ingest.PipelineStatsMap {
ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esAPI, c.pipelinePrefix)
if err != nil {
logger.Debugf("could not get ingest pipeline metrics: %v", err)
return nil
}
return ipMetrics
}
func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage {
du, err := ingest.GetDiskUsage(c.esAPI, c.datastream)
if err != nil {
logger.Debugf("could not get disk usage metrics: %v", err)
return nil
}
return du
}
func (c *collector) collectMetricsPreviousToStop(ctx context.Context) {
c.endIngestMetrics = c.collectIngestMetrics()
c.diskUsage = c.collectDiskUsage()
c.endTotalHits = c.collectTotalHits(ctx)
c.endMetrics = c.collect()
}
func (c *collector) collectTotalHits(ctx context.Context) int {
totalHits, err := common.CountDocsInDataStream(ctx, c.esAPI, c.datastream)
if err != nil {
logger.Debugf("could not get total hits: %s", err)
}
return totalHits
}
func (c *collector) createEventsFromMetrics(m metrics) [][]byte {
dsEvent := struct {
Timestamp int64 `json:"@timestamp"`
*ingest.DataStreamStats
Meta benchMeta `json:"benchmark_metadata"`
}{
Timestamp: m.ts * 1000, // ms to s
DataStreamStats: m.dsMetrics,
Meta: c.metadata,
}
type nEvent struct {
Ts int64 `json:"@timestamp"`
ClusterName string `json:"cluster_name"`
NodeName string `json:"node_name"`
*ingest.NodeStats
Meta benchMeta `json:"benchmark_metadata"`
}
var nEvents []interface{}
for node, stats := range m.nMetrics.Nodes {
nEvents = append(nEvents, nEvent{
Ts: m.ts * 1000, // ms to s
ClusterName: m.nMetrics.ClusterName,
NodeName: node,
NodeStats: &stats,
Meta: c.metadata,
})
}
var events [][]byte
for _, e := range append(nEvents, dsEvent) {
b, err := json.Marshal(e)
if err != nil {
logger.Debugf("error marhsaling metrics event: %w", err)
continue
}
events = append(events, b)
}
return events
}