x-pack/metricbeat/module/statsd/server/registry.go (272 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package server import ( "time" "github.com/rcrowley/go-metrics" "github.com/elastic/beats/v7/metricbeat/helper/labelhash" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) var logger = logp.NewLogger("statd") type metric struct { name string tags map[string]string lastSeen time.Time metric interface{} } type registry struct { metrics map[string]map[string]*metric ttl time.Duration lastReport time.Time } type setMetric struct { set map[string]struct{} } func (s *setMetric) Add(val string) { s.set[val] = struct{}{} } func (s *setMetric) Reset() { s.set = map[string]struct{}{} } func (s *setMetric) Count() int { return len(s.set) } func newSetMetric() *setMetric { s := setMetric{} s.Reset() return &s } type deltaGaugeMetric struct { value float64 } func (d *deltaGaugeMetric) Inc(val float64) { d.value += val } func (d *deltaGaugeMetric) Set(val float64) { d.value = val } func (d *deltaGaugeMetric) Value() float64 { return d.value } // SamplingTimer is a timer that supports sampling type samplingTimer struct { metrics.Timer meter metrics.Meter histogram metrics.Histogram } // NewSamplingTimer returns a new SamplingTimer func newSamplingTimer() *samplingTimer { m := metrics.NewMeter() h := metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015)) return &samplingTimer{ Timer: metrics.NewCustomTimer(h, m), meter: m, histogram: h, } } // SampledUpdate will update the timer a sampled measurement func (s *samplingTimer) SampledUpdate(d time.Duration, sampleRate float64) { s.histogram.Update(int64(d)) s.meter.Mark(int64(1 / sampleRate)) } // Snapshot gets a snapshot of the SamplingTimer func (s *samplingTimer) Snapshot() samplingTimerSnapshot { return samplingTimerSnapshot{ histogram: s.histogram.Snapshot(), meter: s.meter.Snapshot(), } } type samplingTimerSnapshot struct { histogram metrics.Histogram meter metrics.Meter } // Count returns the number of events recorded at the time the snapshot was // taken. func (t *samplingTimerSnapshot) Count() int64 { return t.meter.Count() } // Max returns the maximum value at the time the snapshot was taken. func (t *samplingTimerSnapshot) Max() int64 { return t.histogram.Max() } // Mean returns the mean value at the time the snapshot was taken. func (t *samplingTimerSnapshot) Mean() float64 { return t.histogram.Mean() } // Min returns the minimum value at the time the snapshot was taken. func (t *samplingTimerSnapshot) Min() int64 { return t.histogram.Min() } // Percentile returns an arbitrary percentile of sampled values at the time the // snapshot was taken. func (t *samplingTimerSnapshot) Percentile(p float64) float64 { return t.histogram.Percentile(p) } // Percentiles returns a slice of arbitrary percentiles of sampled values at // the time the snapshot was taken. func (t *samplingTimerSnapshot) Percentiles(ps []float64) []float64 { return t.histogram.Percentiles(ps) } // Rate1 returns the one-minute moving average rate of events per second at the // time the snapshot was taken. func (t *samplingTimerSnapshot) Rate1() float64 { return t.meter.Rate1() } // Rate5 returns the five-minute moving average rate of events per second at // the time the snapshot was taken. func (t *samplingTimerSnapshot) Rate5() float64 { return t.meter.Rate5() } // Rate15 returns the fifteen-minute moving average rate of events per second // at the time the snapshot was taken. func (t *samplingTimerSnapshot) Rate15() float64 { return t.meter.Rate15() } // RateMean returns the meter's mean rate of events per second at the time the // snapshot was taken. func (t *samplingTimerSnapshot) RateMean() float64 { return t.meter.RateMean() } // Snapshot returns the snapshot. func (t *samplingTimerSnapshot) Snapshot() metrics.Timer { return t } // StdDev returns the standard deviation of the values at the time the snapshot // was taken. func (t *samplingTimerSnapshot) StdDev() float64 { return t.histogram.StdDev() } // Stop is a no-op. func (t *samplingTimerSnapshot) Stop() {} // Sum returns the sum at the time the snapshot was taken. func (t *samplingTimerSnapshot) Sum() int64 { return t.histogram.Sum() } // Time panics. func (*samplingTimerSnapshot) Time(func()) { panic("Time called on a samplingTimerSnapshot") } // Update panics. func (*samplingTimerSnapshot) Update(time.Duration) { panic("Update called on a samplingTimerSnapshot") } // Record the duration of an event that started at a time and ends now. func (t *samplingTimerSnapshot) UpdateSince(ts time.Time) { panic("Update called on a samplingTimerSnapshot") } // Variance returns the variance of the values in the sample. func (t *samplingTimerSnapshot) Variance() float64 { return t.histogram.Variance() } type metricsGroup struct { tags map[string]string metrics mapstr.M } func (r *registry) getMetric(metric interface{}) map[string]interface{} { values := map[string]interface{}{} switch m := metric.(type) { case metrics.Counter: values["count"] = m.Count() m.Clear() case *deltaGaugeMetric: values["value"] = m.Value() case metrics.Histogram: h := m.Snapshot() ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) values["count"] = h.Count() values["min"] = h.Min() values["max"] = h.Max() values["mean"] = h.Mean() values["stddev"] = h.StdDev() values["median"] = ps[0] values["p75"] = ps[1] values["p95"] = ps[2] values["p99"] = ps[3] values["p99_9"] = ps[4] case *samplingTimer: t := m.Snapshot() ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) values["count"] = t.Count() values["min"] = t.Min() values["max"] = t.Max() values["mean"] = t.Mean() values["stddev"] = t.StdDev() values["median"] = ps[0] values["p75"] = ps[1] values["p95"] = ps[2] values["p99"] = ps[3] values["p99_9"] = ps[4] values["1m_rate"] = t.Rate1() values["5m_rate"] = t.Rate5() values["15m_rate"] = t.Rate15() values["mean_rate"] = t.RateMean() case *setMetric: values["count"] = m.Count() m.Reset() } return values } func (r *registry) GetAll() []metricsGroup { var tags map[string]string now := time.Now() cutOff := now.Add(-r.ttl) // we do this to ensure metrics are reported at least once if cutOff.After(r.lastReport) { cutOff = r.lastReport } tagGroups := []metricsGroup{} for tagGroupKey, metricsMap := range r.metrics { fields := mapstr.M{} for key, m := range metricsMap { // cleanups according to ttl if r.ttl > 0 && m.lastSeen.Before(cutOff) { if stoppable, ok := m.metric.(metrics.Stoppable); ok { stoppable.Stop() } delete(metricsMap, key) continue } // all the .tags are the same for this metricsMap // we just need one tags = m.tags fields[m.name] = r.getMetric(m.metric) } // cleanup the tag group if it's empty if len(metricsMap) == 0 { delete(r.metrics, tagGroupKey) continue } tagGroups = append(tagGroups, metricsGroup{ metrics: fields, tags: tags, }) } r.lastReport = now return tagGroups } func (r *registry) Delete(name string, tags map[string]string) { if group, ok := r.metrics[r.metricHash(tags)]; ok { delete(group, name) } } func (r *registry) getOrNew(name string, tags map[string]string, new func() interface{}) interface{} { tagsKey := r.metricHash(tags) tc, ok := r.metrics[tagsKey] if !ok { counter := new() r.metrics[tagsKey] = map[string]*metric{name: { metric: counter, name: name, tags: tags, lastSeen: time.Now(), }} return counter } c, ok := tc[name] if !ok { counter := new() tc[name] = &metric{ metric: counter, name: name, tags: tags, lastSeen: time.Now(), } return counter } c.lastSeen = time.Now() return c.metric } func (r *registry) clearTypeChanged(name string, tags map[string]string) { // type was changed // we can try to support the situation where a new version of the app has changed a type in // a metric by deleting the old one and creating a new one logger.With("name", name).Warn("metric changed type") r.Delete(name, tags) } func (r *registry) GetOrNewCounter(name string, tags map[string]string) metrics.Counter { maybeCounter := r.getOrNew(name, tags, func() interface{} { return metrics.NewCounter() }) counter, ok := maybeCounter.(metrics.Counter) if ok { return counter } r.clearTypeChanged(name, tags) return r.GetOrNewCounter(name, tags) } func (r *registry) GetOrNewTimer(name string, tags map[string]string) *samplingTimer { timer, ok := r.getOrNew(name, tags, func() interface{} { return newSamplingTimer() }).(*samplingTimer) if ok { return timer } r.clearTypeChanged(name, tags) return r.GetOrNewTimer(name, tags) } func (r *registry) GetOrNewGauge64(name string, tags map[string]string) *deltaGaugeMetric { gauge, ok := r.getOrNew(name, tags, func() interface{} { return &deltaGaugeMetric{} }).(*deltaGaugeMetric) if ok { return gauge } r.clearTypeChanged(name, tags) return r.GetOrNewGauge64(name, tags) } func (r *registry) GetOrNewHistogram(name string, tags map[string]string) metrics.Histogram { histogram, ok := r.getOrNew(name, tags, func() interface{} { return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015)) }).(metrics.Histogram) if ok { return histogram } r.clearTypeChanged(name, tags) return r.GetOrNewHistogram(name, tags) } func (r *registry) GetOrNewSet(name string, tags map[string]string) *setMetric { setmetric, ok := r.getOrNew(name, tags, func() interface{} { return newSetMetric() }).(*setMetric) if ok { return setmetric } r.clearTypeChanged(name, tags) return r.GetOrNewSet(name, tags) } func (r *registry) metricHash(tags map[string]string) string { mapstrTags := mapstr.M{} for k, v := range tags { mapstrTags[k] = v } return labelhash.LabelHash(mapstrTags) }