pkg/selfmonitor/metrics_imp_v2.go (384 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package selfmonitor
import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/alibaba/ilogtail/pkg/helper/math"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/protocol"
)
var (
_ CounterMetric = (*cumulativeCounterImp)(nil)
_ CounterMetric = (*counterImp)(nil)
_ CounterMetric = (*averageImp)(nil)
_ GaugeMetric = (*gaugeImp)(nil)
_ LatencyMetric = (*latencyImp)(nil)
_ StringMetric = (*strMetricImp)(nil)
_ CounterMetric = (*errorNumericMetric)(nil)
_ GaugeMetric = (*errorNumericMetric)(nil)
_ LatencyMetric = (*errorNumericMetric)(nil)
_ StringMetric = (*errorStrMetric)(nil)
)
func newMetric(metricType SelfMetricType, metricSet MetricSet, labelValues []string) Metric {
switch metricType {
case CumulativeCounterType:
return newCumulativeCounter(metricSet, labelValues)
case AverageType:
return newAverage(metricSet, labelValues)
case MaxType:
return newMax(metricSet, labelValues)
case CounterType:
return newDeltaCounter(metricSet, labelValues)
case GaugeType:
return newGauge(metricSet, labelValues)
case StringType:
return newStringMetric(metricSet, labelValues)
case LatencyType:
return newLatency(metricSet, labelValues)
}
return newErrorMetric(metricType, errors.New("invalid metric type"))
}
// ErrorMetrics always return error.
func newErrorMetric(metricType SelfMetricType, err error) Metric {
switch metricType {
case StringType:
return newErrorStringMetric(err)
default:
return newErrorNumericMetric(err)
}
}
// Deprecated: Use deltaImp instead.
// cumulativeCounterImp is a counter metric that can be incremented or decremented.
// It gets the cumulative value of the counter.
type cumulativeCounterImp struct {
value int64
Series
}
func newCumulativeCounter(ms MetricSet, labelValues []string) CounterMetric {
c := &cumulativeCounterImp{
Series: newSeries(ms, labelValues),
}
return c
}
func (c *cumulativeCounterImp) Add(delta int64) {
atomic.AddInt64(&c.value, delta)
}
func (c *cumulativeCounterImp) Collect() MetricValue[float64] {
value := atomic.LoadInt64(&c.value)
return MetricValue[float64]{Name: c.Name(), Value: float64(value)}
}
func (c *cumulativeCounterImp) Clear() {
atomic.StoreInt64(&c.value, 0)
}
func (c *cumulativeCounterImp) Serialize(log *protocol.Log) {
metricValue := c.Collect()
c.Series.SerializeWithStr(log, metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (c *cumulativeCounterImp) Export() map[string]string {
metricValue := c.Collect()
return c.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (c *cumulativeCounterImp) Type() SelfMetricType {
return CounterType
}
// delta is a counter metric that can be incremented or decremented.
// It gets the increased value in the last window.
type counterImp struct {
value int64
Series
}
func newDeltaCounter(ms MetricSet, labelValues []string) CounterMetric {
c := &counterImp{
Series: newSeries(ms, labelValues),
}
return c
}
func (c *counterImp) Add(delta int64) {
atomic.AddInt64(&c.value, delta)
}
func (c *counterImp) Collect() MetricValue[float64] {
value := atomic.SwapInt64(&c.value, 0)
return MetricValue[float64]{Name: c.Name(), Value: float64(value)}
}
func (c *counterImp) Clear() {
atomic.StoreInt64(&c.value, 0)
}
func (c *counterImp) Serialize(log *protocol.Log) {
metricValue := c.Collect()
c.Series.SerializeWithStr(log, metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (c *counterImp) Export() map[string]string {
metricValue := c.Collect()
return c.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (c *counterImp) Type() SelfMetricType {
return CounterType
}
// gauge is a metric that represents a single numerical value that can arbitrarily go up and down.
type gaugeImp struct {
value float64
Series
}
func newGauge(ms MetricSet, labelValues []string) GaugeMetric {
g := &gaugeImp{
Series: newSeries(ms, labelValues),
}
return g
}
func (g *gaugeImp) Set(f float64) {
math.AtomicStoreFloat64(&g.value, f)
}
func (g *gaugeImp) Collect() MetricValue[float64] {
return MetricValue[float64]{Name: g.Name(), Value: math.AtomicLoadFloat64(&g.value)}
}
func (g *gaugeImp) Clear() {
math.AtomicStoreFloat64(&g.value, 0)
}
func (g *gaugeImp) Serialize(log *protocol.Log) {
metricValue := g.Collect()
g.Series.SerializeWithStr(log, metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (g *gaugeImp) Export() map[string]string {
metricValue := g.Collect()
return g.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (g *gaugeImp) Type() SelfMetricType {
return GaugeType
}
// averageImp is a metric to compute the average value of a series of values in the last window.
// if there is no value added in the last window, the previous average value will be returned.
type averageImp struct {
sync.RWMutex
value int64
count int64
prevAvg float64
Series
}
func newAverage(ms MetricSet, labelValues []string) CounterMetric {
a := &averageImp{
Series: newSeries(ms, labelValues),
}
return a
}
func (a *averageImp) Add(f int64) {
a.Lock()
defer a.Unlock()
a.value += f
a.count++
}
func (a *averageImp) Collect() MetricValue[float64] {
a.RLock()
defer a.RUnlock()
if a.count == 0 {
return MetricValue[float64]{Name: a.Name(), Value: a.prevAvg}
}
avg := float64(a.value) / float64(a.count)
a.prevAvg, a.value, a.count = avg, 0, 0
return MetricValue[float64]{Name: a.Name(), Value: avg}
}
func (a *averageImp) Clear() {
a.Lock()
a.value = 0
a.count = 0
a.prevAvg = 0
a.Unlock()
}
func (a *averageImp) Serialize(log *protocol.Log) {
metricValue := a.Collect()
a.Series.SerializeWithStr(log, metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (a *averageImp) Export() map[string]string {
metricValue := a.Collect()
return a.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (a *averageImp) Type() SelfMetricType {
return GaugeType
}
// maxImp is a metric to compute the max value of a series of values in the last window.
// if there is no value added in the last window, zero will be returned.
type maxImp struct {
sync.RWMutex
value float64
Series
}
func newMax(ms MetricSet, labelValues []string) GaugeMetric {
m := &maxImp{
Series: newSeries(ms, labelValues),
}
return m
}
func (m *maxImp) Set(f float64) {
m.Lock()
defer m.Unlock()
if f > m.value {
m.value = f
}
}
func (m *maxImp) Collect() MetricValue[float64] {
m.RLock()
defer m.RUnlock()
metric := MetricValue[float64]{Name: m.Name(), Value: m.value}
m.value = 0
return metric
}
func (m *maxImp) Export() map[string]string {
metricValue := m.Collect()
return m.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64))
}
func (m *maxImp) Type() SelfMetricType {
return GaugeType
}
// latencyImp is a metric to compute the average latency of a series of values in the last window.
type latencyImp struct {
sync.Mutex
count int64
latencySum float64
Series
}
func newLatency(ms MetricSet, labelValues []string) LatencyMetric {
l := &latencyImp{
Series: newSeries(ms, labelValues),
}
return l
}
func (l *latencyImp) Observe(f float64) {
l.Lock()
defer l.Unlock()
l.count++
l.latencySum += f
}
func (l *latencyImp) Record(d time.Duration) {
l.Observe(float64(d))
}
func (l *latencyImp) Collect() MetricValue[float64] {
l.Lock()
defer l.Unlock()
if l.count == 0 {
return MetricValue[float64]{Name: l.Name(), Value: 0}
}
avg := l.latencySum / float64(l.count)
l.count, l.latencySum = 0, 0
return MetricValue[float64]{Name: l.Name(), Value: avg}
}
func (l *latencyImp) Clear() {
l.Lock()
defer l.Unlock()
l.count = 0
l.latencySum = 0
}
func (l *latencyImp) Serialize(log *protocol.Log) {
metricValue := l.Collect()
l.Series.SerializeWithStr(log, metricValue.Name, strconv.FormatFloat(metricValue.Value/1000, 'f', 4, 64)) // ns to us
}
func (l *latencyImp) Export() map[string]string {
metricValue := l.Collect()
return l.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value/1000, 'f', 4, 64)) // ns to us
}
func (l *latencyImp) Type() SelfMetricType {
return GaugeType
}
// strMetricImp is a metric that represents a single string value.
type strMetricImp struct {
sync.RWMutex
value string
Series
}
func newStringMetric(ms MetricSet, labelValues []string) StringMetric {
s := &strMetricImp{
Series: newSeries(ms, labelValues),
}
return s
}
func (s *strMetricImp) Set(str string) {
s.Lock()
defer s.Unlock()
s.value = str
}
func (s *strMetricImp) Collect() MetricValue[string] {
s.RLock()
defer s.RUnlock()
return MetricValue[string]{Name: s.Name(), Value: s.value}
}
func (s *strMetricImp) Clear() {
s.Lock()
s.value = ""
s.Unlock()
}
func (s *strMetricImp) Serialize(log *protocol.Log) {
metricValue := s.Collect()
s.Series.SerializeWithStr(log, metricValue.Name, metricValue.Value)
}
func (s *strMetricImp) Export() map[string]string {
metricValue := s.Collect()
return s.Series.Export(metricValue.Name, metricValue.Value)
}
func (s *strMetricImp) Type() SelfMetricType {
return GaugeType
}
type Series struct {
MetricSet
labelValues []string
}
func newSeries(ms MetricSet, labelValues []string) Series {
var indexToStore []string
if labelValues != nil {
indexToStore = make([]string, len(labelValues))
copy(indexToStore, labelValues)
}
return Series{
MetricSet: ms,
labelValues: indexToStore,
}
}
func (s Series) SerializeWithStr(log *protocol.Log, metricName, metricValueStr string) {
log.Contents = append(log.Contents,
&protocol.Log_Content{Key: metricName, Value: metricValueStr},
&protocol.Log_Content{Key: SelfMetricNameKey, Value: metricName})
for _, v := range s.ConstLabels() {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: v.Key, Value: v.Value})
}
labelNames := s.LabelKeys()
for i, v := range s.labelValues {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: labelNames[i], Value: v})
}
}
func (s Series) Export(metricName, metricValue string) map[string]string {
ret := make(map[string]string, len(s.ConstLabels())+len(s.labelValues)+2)
ret[metricName] = metricValue
ret[SelfMetricNameKey] = metricName
for _, v := range s.ConstLabels() {
ret[v.Key] = v.Value
}
for i, v := range s.labelValues {
ret[s.LabelKeys()[i]] = v
}
return ret
}
/*
Following are the metrics returned when WithLabel encountered an error.
*/
type errorNumericMetric struct {
err error
}
func (e *errorNumericMetric) Add(f int64) {
logger.Warning(context.Background(), "METRIC_WITH_LABEL_ALARM", "add", e.err)
}
func (e *errorNumericMetric) Set(f float64) {
logger.Warning(context.Background(), "METRIC_WITH_LABEL_ALARM", "set", e.err)
}
func (e *errorNumericMetric) Observe(f float64) {
logger.Warning(context.Background(), "METRIC_WITH_LABEL_ALARM", "observe", e.err)
}
func (e *errorNumericMetric) Serialize(log *protocol.Log) {}
func (e *errorNumericMetric) Export() map[string]string {
return nil
}
func (e *errorNumericMetric) Type() SelfMetricType {
return CounterType
}
func (e *errorNumericMetric) Collect() MetricValue[float64] {
return MetricValue[float64]{Name: "", Value: 0}
}
func (e *errorNumericMetric) Clear() {}
func newErrorNumericMetric(err error) *errorNumericMetric {
return &errorNumericMetric{err: err}
}
type errorStrMetric struct {
errorNumericMetric
}
func (e errorStrMetric) Set(s string) {
logger.Warning(context.Background(), "METRIC_WITH_LABEL_ALARM", "set", e.err)
}
func (e errorStrMetric) Collect() MetricValue[string] {
return MetricValue[string]{Name: "", Value: ""}
}
func newErrorStringMetric(err error) StringMetric {
return &errorStrMetric{errorNumericMetric: errorNumericMetric{err: err}}
}