pkg/selfmonitor/metrics_vector_imp.go (286 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package selfmonitor import ( "fmt" "sync" "unsafe" "github.com/alibaba/ilogtail/pkg/helper/pool" "github.com/alibaba/ilogtail/pkg/protocol" ) const ( defaultTagValue = "-" ) var ( DefaultCacheFactory = NewMapCache ) // SetMetricVectorCacheFactory allows users to set the cache factory for the metric vector, like Prometheus SDK. func SetMetricVectorCacheFactory(factory func(MetricSet) MetricVectorCache) { DefaultCacheFactory = factory } type ( CumulativeCounterMetricVector = MetricVector[CounterMetric] AverageMetricVector = MetricVector[CounterMetric] MaxMetricVector = MetricVector[GaugeMetric] CounterMetricVector = MetricVector[CounterMetric] GaugeMetricVector = MetricVector[GaugeMetric] LatencyMetricVector = MetricVector[LatencyMetric] StringMetricVector = MetricVector[StringMetric] ) // Deprecated: use NewCounterMetricVector instead. // NewCumulativeCounterMetricVector creates a new CounterMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewCumulativeCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector { return NewMetricVector[CounterMetric](metricName, CumulativeCounterType, constLabels, labelNames) } // NewCounterMetricVector creates a new DeltaMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector { return NewMetricVector[CounterMetric](metricName, CounterType, constLabels, labelNames) } // NewAverageMetricVector creates a new AverageMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewAverageMetricVector(metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector { return NewMetricVector[CounterMetric](metricName, AverageType, constLabels, labelNames) } // NewMaxMetricVector creates a new MaxMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewMaxMetricVector(metricName string, constLabels map[string]string, labelNames []string) MaxMetricVector { return NewMetricVector[GaugeMetric](metricName, MaxType, constLabels, labelNames) } // NewGaugeMetricVector creates a new GaugeMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewGaugeMetricVector(metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector { return NewMetricVector[GaugeMetric](metricName, GaugeType, constLabels, labelNames) } // NewStringMetricVector creates a new StringMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewStringMetricVector(metricName string, constLabels map[string]string, labelNames []string) StringMetricVector { return NewMetricVector[StringMetric](metricName, StringType, constLabels, labelNames) } // NewLatencyMetricVector creates a new LatencyMetricVector. // Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually. func NewLatencyMetricVector(metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector { return NewMetricVector[LatencyMetric](metricName, LatencyType, constLabels, labelNames) } // NewCumulativeCounterMetricVectorAndRegister creates a new CounterMetricVector and register it to the MetricsRecord. func NewCumulativeCounterMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector { v := NewMetricVector[CounterMetric](metricName, CumulativeCounterType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewAverageMetricVectorAndRegister creates a new AverageMetricVector and register it to the MetricsRecord. func NewAverageMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector { v := NewMetricVector[CounterMetric](metricName, AverageType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewCounterMetricVectorAndRegister creates a new DeltaMetricVector and register it to the MetricsRecord. func NewCounterMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector { v := NewMetricVector[CounterMetric](metricName, CounterType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewGaugeMetricVectorAndRegister creates a new GaugeMetricVector and register it to the MetricsRecord. func NewGaugeMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector { v := NewMetricVector[GaugeMetric](metricName, GaugeType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewLatencyMetricVectorAndRegister creates a new LatencyMetricVector and register it to the MetricsRecord. func NewLatencyMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector { v := NewMetricVector[LatencyMetric](metricName, LatencyType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewStringMetricVectorAndRegister creates a new StringMetricVector and register it to the MetricsRecord. func NewStringMetricVectorAndRegister(mr *MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) StringMetricVector { v := NewMetricVector[StringMetric](metricName, StringType, constLabels, labelNames) mr.RegisterMetricCollector(v) return v } // NewCumulativeCounterMetric creates a new CounterMetric. func NewCumulativeCounterMetric(n string, lables ...*protocol.Log_Content) CounterMetric { return NewCumulativeCounterMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewAverageMetric creates a new AverageMetric. func NewAverageMetric(n string, lables ...*protocol.Log_Content) CounterMetric { return NewAverageMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewCounterMetric creates a new DeltaMetric. func NewCounterMetric(n string, lables ...*protocol.Log_Content) CounterMetric { return NewCounterMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewGaugeMetric creates a new GaugeMetric. func NewGaugeMetric(n string, lables ...*protocol.Log_Content) GaugeMetric { return NewGaugeMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewStringMetric creates a new StringMetric. func NewStringMetric(n string, lables ...*protocol.Log_Content) StringMetric { return NewStringMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewLatencyMetric creates a new LatencyMetric. func NewLatencyMetric(n string, lables ...*protocol.Log_Content) LatencyMetric { return NewLatencyMetricVector(n, convertLabels(lables), nil).WithLabels() } // NewCumulativeCounterMetricAndRegister creates a new CounterMetric and register it's metricVector to the MetricsRecord. func NewCumulativeCounterMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) CounterMetric { mv := NewCumulativeCounterMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewCounterMetricAndRegister creates a new DeltaMetric and register it's metricVector to the MetricsRecord. func NewCounterMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) CounterMetric { mv := NewCounterMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewAverageMetricAndRegister creates a new AverageMetric and register it's metricVector to the MetricsRecord. func NewAverageMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) CounterMetric { mv := NewAverageMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewMaxMetricAndRegister creates a new MaxMetric and register it's metricVector to the MetricsRecord. func NewMaxMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) GaugeMetric { mv := NewMaxMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewGaugeMetricAndRegister creates a new GaugeMetric and register it's metricVector to the MetricsRecord. func NewGaugeMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) GaugeMetric { mv := NewGaugeMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewLatencyMetricAndRegister creates a new LatencyMetric and register it's metricVector to the MetricsRecord. func NewLatencyMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) LatencyMetric { mv := NewLatencyMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } // NewStringMetricAndRegister creates a new StringMetric and register it's metricVector to the MetricsRecord. func NewStringMetricAndRegister(c *MetricsRecord, n string, lables ...*protocol.Log_Content) StringMetric { mv := NewStringMetricVector(n, convertLabels(lables), nil) c.RegisterMetricCollector(mv.(MetricCollector)) return mv.WithLabels() } var ( _ MetricCollector = (*MetricVectorImpl[CounterMetric])(nil) _ MetricSet = (*MetricVectorImpl[StringMetric])(nil) _ MetricVector[CounterMetric] = (*MetricVectorImpl[CounterMetric])(nil) _ MetricVector[GaugeMetric] = (*MetricVectorImpl[GaugeMetric])(nil) _ MetricVector[LatencyMetric] = (*MetricVectorImpl[LatencyMetric])(nil) _ MetricVector[StringMetric] = (*MetricVectorImpl[StringMetric])(nil) ) type MetricVectorAndCollector[T Metric] interface { MetricVector[T] MetricCollector } type MetricVectorImpl[T Metric] struct { *metricVector } // NewMetricVector creates a new MetricVector. // It returns a MetricVectorAndCollector, which is a MetricVector and a MetricCollector. // For plugin developers, they should use MetricVector APIs to create metrics. // For agent itself, it uses MetricCollector APIs to collect metrics. func NewMetricVector[T Metric](metricName string, metricType SelfMetricType, constLabels map[string]string, labelNames []string) MetricVectorAndCollector[T] { return &MetricVectorImpl[T]{ metricVector: newMetricVector(metricName, metricType, constLabels, labelNames), } } func (m *MetricVectorImpl[T]) WithLabels(labels ...LabelPair) T { return m.metricVector.WithLabels(labels...).(T) } // MetricVectorCache is a cache for MetricVector. type MetricVectorCache interface { // return a metric with the given label values. // Note that the label values are sorted according to the label keys in MetricSet. WithLabelValues([]string) Metric MetricCollector } type metricVector struct { name string // metric name metricType SelfMetricType constLabels []LabelPair // constLabels is the labels that are not changed when the metric is created. labelKeys []string // labelNames is the names of the labels. The values of the labels can be changed. indexPool pool.GenericPool[string] // index is []string, which is sorted according to labelNames. cache MetricVectorCache // collector is a map[string]Metric, key is the index of the metric. } func newMetricVector( metricName string, metricType SelfMetricType, constLabels map[string]string, labelNames []string, ) *metricVector { mv := &metricVector{ name: metricName, metricType: metricType, labelKeys: labelNames, indexPool: pool.NewGenericPool(func() []string { return make([]string, 0, 10) }), } for k, v := range constLabels { mv.constLabels = append(mv.constLabels, LabelPair{Key: k, Value: v}) } mv.cache = DefaultCacheFactory(mv) return mv } func (v *metricVector) Name() string { return v.name } func (v *metricVector) Type() SelfMetricType { return v.metricType } func (v *metricVector) ConstLabels() []LabelPair { return v.constLabels } func (v *metricVector) LabelKeys() []string { return v.labelKeys } func (v *metricVector) WithLabels(labels ...LabelPair) Metric { labelValues, err := v.buildLabelValues(labels) if err != nil { return newErrorMetric(v.metricType, err) } defer v.indexPool.Put(labelValues) return v.cache.WithLabelValues(*labelValues) } func (v *metricVector) Collect() []Metric { return v.cache.Collect() } // buildLabelValues return the index func (v *metricVector) buildLabelValues(labels []LabelPair) (*[]string, error) { if len(labels) > len(v.labelKeys) { return nil, fmt.Errorf("too many labels, expected %d, got %d. defined labels: %v", len(v.labelKeys), len(labels), v.labelKeys) } index := v.indexPool.Get() for range v.labelKeys { *index = append(*index, defaultTagValue) } for d, tag := range labels { if v.labelKeys[d] == tag.Key { // fast path (*index)[d] = tag.Value } else { err := v.slowConstructIndex(index, tag) if err != nil { v.indexPool.Put(index) return nil, err } } } return index, nil } func (v *metricVector) slowConstructIndex(index *[]string, tag LabelPair) error { for i, tagName := range v.labelKeys { if tagName == tag.Key { (*index)[i] = tag.Value return nil } } return fmt.Errorf("undefined label: %s in %v", tag.Key, v.labelKeys) } type MapCache struct { MetricSet bytesPool pool.GenericPool[byte] sync.Map } func NewMapCache(metricSet MetricSet) MetricVectorCache { return &MapCache{ MetricSet: metricSet, bytesPool: pool.NewGenericPool(func() []byte { return make([]byte, 0, 128) }), } } func (v *MapCache) WithLabelValues(labelValues []string) Metric { buffer := v.bytesPool.Get() for _, tagValue := range labelValues { *buffer = append(*buffer, '|') *buffer = append(*buffer, tagValue...) } /* #nosec G103 */ k := *(*string)(unsafe.Pointer(buffer)) acV, loaded := v.Load(k) if loaded { metric := acV.(Metric) v.bytesPool.Put(buffer) return metric } newMetric := newMetric(v.Type(), v, labelValues) acV, loaded = v.LoadOrStore(k, newMetric) if loaded { v.bytesPool.Put(buffer) } return acV.(Metric) } func (v *MapCache) Collect() []Metric { res := make([]Metric, 0, 10) v.Range(func(key, value interface{}) bool { res = append(res, value.(Metric)) return true }) return res } func convertLabels(labels []*protocol.Log_Content) map[string]string { if len(labels) == 0 { return nil } l := make(map[string]string) for _, label := range labels { l[label.Key] = label.Value } return l }