systemtest/benchtest/expvar/expvar.go (163 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package expvar
import (
"context"
"encoding/json"
"net/http"
"runtime"
)
const (
cloudProxyHeader = "x-found-handling-instance"
)
// TODO(axw) reuse apmservertest.Expvar, expose function(s) for fetching
// from APM Server given a URL.
type expvar struct {
runtime.MemStats `json:"memstats"`
LibbeatStats
ElasticResponseStats
OTLPResponseStats
TailSamplingStats
// UncompressedBytes holds the number of bytes of uncompressed
// data that the server has read from the Elastic APM events
// intake endpoint.
//
// TODO(axw) instrument the net/http.Transport to count bytes
// transferred, so we can measure for OTLP and Jaeger too.
// Alternatively, implement an in-memory reverse proxy that
// does the same.
UncompressedBytes int64 `json:"apm-server.decoder.uncompressed.bytes"`
AvailableBulkRequests int64 `json:"output.elasticsearch.bulk_requests.available"`
}
type ElasticResponseStats struct {
TotalElasticResponses int64 `json:"apm-server.server.response.count"`
ErrorElasticResponses int64 `json:"apm-server.server.response.errors.count"`
TransactionsProcessed int64 `json:"apm-server.processor.transaction.transformations"`
SpansProcessed int64 `json:"apm-server.processor.span.transformations"`
MetricsProcessed int64 `json:"apm-server.processor.metric.transformations"`
ErrorsProcessed int64 `json:"apm-server.processor.error.transformations"`
IntakeEventsAccepted int64 `json:"apm-server.processor.stream.accepted"`
IntakeEventsErrorsInvalid int64 `json:"apm-server.processor.stream.errors.invalid"`
IntakeEventsErrorsTooLarge int64 `json:"apm-server.processor.stream.errors.toolarge"`
}
type OTLPResponseStats struct {
TotalOTLPMetricsResponses int64 `json:"apm-server.otlp.grpc.metrics.response.count"`
ErrorOTLPMetricsResponses int64 `json:"apm-server.otlp.grpc.metrics.response.errors.count"`
TotalOTLPTracesResponses int64 `json:"apm-server.otlp.grpc.traces.response.count"`
ErrorOTLPTracesResponses int64 `json:"apm-server.otlp.grpc.traces.response.errors.count"`
}
type LibbeatStats struct {
ActiveEvents int64 `json:"libbeat.output.events.active"`
TotalEvents int64 `json:"libbeat.output.events.total"`
Goroutines int64 `json:"beat.runtime.goroutines"`
RSSMemoryBytes int64 `json:"beat.memstats.rss"`
}
type TailSamplingStats struct {
TBSLsmSize int64 `json:"apm-server.sampling.tail.storage.lsm_size"`
TBSVlogSize int64 `json:"apm-server.sampling.tail.storage.value_log_size"`
}
func queryExpvar(ctx context.Context, out *expvar, srv string) error {
req, err := http.NewRequest("GET", srv+"/debug/vars", nil)
if err != nil {
return err
}
req.WithContext(ctx)
req.Header.Set("Accept", "application/json")
agg := make(map[string]expvar)
var seen int
const timesSeen = 10
for {
var tmp expvar
// NOTE(marclop) we could also aggregate based on the beats ephemeral id.
// However, that has the drawback of not being to replace a node when it
// restarts.
// For example, if we push too hard and it falls out of the LB, the beats
// ID will change if the APM Server process is restarted, resulting in
// some stats never being updated.
id, err := doExpvar(req, &tmp)
if err != nil {
return err
}
if _, ok := agg[id]; ok {
seen++
}
agg[id] = tmp
// We must ensure that we have made enough requests to the remote APM
// Server to guarantee with a degree of certainty that all the remote
// APM Servers metrics have been queried.
if seen > timesSeen*len(agg) {
break
}
}
var result expvar
for _, s := range agg {
aggregateMemStats(s.MemStats, &result.MemStats)
aggregateResponseStats(s.ElasticResponseStats, &result.ElasticResponseStats)
aggregateOTLPResponseStats(s.OTLPResponseStats, &result.OTLPResponseStats)
aggregateLibbeatStats(s.LibbeatStats, &result.LibbeatStats)
aggregateTailSamplingStats(s.TailSamplingStats, &result.TailSamplingStats)
result.UncompressedBytes += s.UncompressedBytes
result.AvailableBulkRequests += s.AvailableBulkRequests
}
*out = result
return nil
}
func doExpvar(req *http.Request, out *expvar) (string, error) {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
id := resp.Header.Get(cloudProxyHeader)
err = json.NewDecoder(resp.Body).Decode(out)
return id, err
}
// WaitUntilServerInactive blocks until one of the conditions occurs:
// * APM Server is inactive (has no "active" events on the output buffer).
// * HTTP call returns with an error
// * Context is done.
func WaitUntilServerInactive(ctx context.Context, server string) error {
result := expvar{LibbeatStats: LibbeatStats{ActiveEvents: 1}}
for result.ActiveEvents > 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := queryExpvar(ctx, &result, server); err != nil {
return err
}
}
}
return nil
}
// NOTE(marclop): There are some fields that aren't being aggregated
// like the circular buffers PauseNS, PauseEnd, BySize, and booleans
// (DebugGC, EnableGC).
func aggregateMemStats(from runtime.MemStats, to *runtime.MemStats) {
to.Alloc += from.Alloc
to.TotalAlloc += from.TotalAlloc
to.Sys += from.Sys
to.Lookups += from.Lookups
to.Mallocs += from.Mallocs
to.Frees += from.Frees
to.HeapAlloc += from.HeapAlloc
to.HeapSys += from.HeapSys
to.HeapIdle += from.HeapIdle
to.HeapInuse += from.HeapInuse
to.HeapReleased += from.HeapReleased
to.HeapObjects += from.HeapObjects
to.StackInuse += from.StackInuse
to.StackSys += from.StackSys
to.MSpanInuse += from.MSpanInuse
to.MSpanSys += from.MSpanSys
to.MCacheInuse += from.MCacheInuse
to.MCacheSys += from.MCacheSys
to.BuckHashSys += from.BuckHashSys
to.GCSys += from.GCSys
to.OtherSys += from.OtherSys
to.NextGC += from.NextGC
to.LastGC += from.LastGC
to.PauseTotalNs += from.PauseTotalNs
to.NumGC += from.NumGC
to.NumForcedGC += from.NumForcedGC
to.GCCPUFraction += from.GCCPUFraction
}
func aggregateLibbeatStats(from LibbeatStats, to *LibbeatStats) {
to.ActiveEvents += from.ActiveEvents
to.TotalEvents += from.TotalEvents
to.Goroutines += from.Goroutines
to.RSSMemoryBytes += from.RSSMemoryBytes
}
func aggregateResponseStats(from ElasticResponseStats, to *ElasticResponseStats) {
to.ErrorElasticResponses += from.ErrorElasticResponses
to.ErrorsProcessed += from.ErrorsProcessed
to.MetricsProcessed += from.MetricsProcessed
to.SpansProcessed += from.SpansProcessed
to.TransactionsProcessed += from.TransactionsProcessed
to.TotalElasticResponses += from.TotalElasticResponses
to.IntakeEventsAccepted += from.IntakeEventsAccepted
to.IntakeEventsErrorsInvalid += from.IntakeEventsErrorsInvalid
to.IntakeEventsErrorsTooLarge += from.IntakeEventsErrorsTooLarge
}
func aggregateOTLPResponseStats(from OTLPResponseStats, to *OTLPResponseStats) {
to.TotalOTLPMetricsResponses += from.TotalOTLPMetricsResponses
to.TotalOTLPTracesResponses += from.TotalOTLPTracesResponses
to.ErrorOTLPTracesResponses += from.ErrorOTLPTracesResponses
to.ErrorOTLPMetricsResponses += from.ErrorOTLPMetricsResponses
}
func aggregateTailSamplingStats(from TailSamplingStats, to *TailSamplingStats) {
to.TBSLsmSize += from.TBSLsmSize
to.TBSVlogSize += from.TBSVlogSize
}