internal/satellite/telemetry/metricservice/base.go (209 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package metricservice import ( "errors" "fmt" "math" "sync" "sync/atomic" "unicode/utf8" "unsafe" "github.com/apache/skywalking-satellite/internal/satellite/telemetry" v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) var errInconsistentCardinality = errors.New("inconsistent label cardinality") // Inspired by Prometheus(prometheus/client_golang), adding labels data is more efficient and saves memory const ( offset64 = 14695981039346656037 prime64 = 1099511628211 SeparatorByte byte = 255 ) type Metric interface { telemetry.Metric WriteMetric(appender *MetricsAppender) } type SubMetric interface { WriteMetric(base *BaseMetric, labels []*v3.Label, appender *MetricsAppender) } type BaseMetric struct { Metric Name string LabelKeys []string NewSubMetric func(labelValues ...string) SubMetric curry []curriedLabelValue mtx sync.RWMutex metrics map[uint64][]metricWithLabelValues } func NewBaseMetric(name string, labels []string, newSubMetric func(labelValues ...string) SubMetric) *BaseMetric { return &BaseMetric{ Name: name, LabelKeys: labels, NewSubMetric: newSubMetric, metrics: map[uint64][]metricWithLabelValues{}, } } func (b *BaseMetric) WriteMetric(appender *MetricsAppender) { for _, m := range b.metrics { for _, metric := range m { labels := make([]*v3.Label, len(b.LabelKeys)) for i := range b.LabelKeys { labels[i] = &v3.Label{ Name: b.LabelKeys[i], Value: metric.values[i], } } metric.metric.WriteMetric(b, labels, appender) } } } func (b *BaseMetric) GetMetricWithLabelValues(lvs ...string) (SubMetric, error) { h, err := b.hashLabelValues(lvs) if err != nil { return nil, err } return b.getOrCreateMetricWithLabelValues(h, lvs, b.curry), nil } type metricWithLabelValues struct { values []string metric SubMetric } // curriedLabelValue sets the curried value for a label at the given index. type curriedLabelValue struct { index int value string } func validateLabelValues(vals []string, expectedNumberOfValues int) error { if len(vals) != expectedNumberOfValues { return fmt.Errorf( "%s: expected %d label values but got %d in %#v", errInconsistentCardinality, expectedNumberOfValues, len(vals), vals, ) } for _, val := range vals { if !utf8.ValidString(val) { return fmt.Errorf("label value %q is not valid UTF-8", val) } } return nil } func (b *BaseMetric) hashLabelValues(vals []string) (uint64, error) { if err := validateLabelValues(vals, len(b.LabelKeys)-len(b.curry)); err != nil { return 0, err } var ( h = hashNew() curry = b.curry iVals, iCurry int ) for i := 0; i < len(b.LabelKeys); i++ { if iCurry < len(curry) && curry[iCurry].index == i { h = hashAdd(h, curry[iCurry].value) iCurry++ } else { h = hashAdd(h, vals[iVals]) iVals++ } h = hashAddByte(h, SeparatorByte) } return h, nil } func (b *BaseMetric) getOrCreateMetricWithLabelValues( hash uint64, lvs []string, curry []curriedLabelValue, ) SubMetric { b.mtx.RLock() metric, ok := b.getMetricWithHashAndLabelValues(hash, lvs, curry) b.mtx.RUnlock() if ok { return metric } b.mtx.Lock() defer b.mtx.Unlock() metric, ok = b.getMetricWithHashAndLabelValues(hash, lvs, curry) if !ok { inlinedLVs := inlineLabelValues(lvs, curry) metric = b.NewSubMetric(inlinedLVs...) b.metrics[hash] = append(b.metrics[hash], metricWithLabelValues{values: inlinedLVs, metric: metric}) } return metric } func (b *BaseMetric) getMetricWithHashAndLabelValues( h uint64, lvs []string, curry []curriedLabelValue, ) (SubMetric, bool) { metrics, ok := b.metrics[h] if ok { if i := findMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { return metrics[i].metric, true } } return nil, false } func inlineLabelValues(lvs []string, curry []curriedLabelValue) []string { labelValues := make([]string, len(lvs)+len(curry)) var iCurry, iLVs int for i := range labelValues { if iCurry < len(curry) && curry[iCurry].index == i { labelValues[i] = curry[iCurry].value iCurry++ continue } labelValues[i] = lvs[iLVs] iLVs++ } return labelValues } func findMetricWithLabelValues( metrics []metricWithLabelValues, lvs []string, curry []curriedLabelValue, ) int { for i, metric := range metrics { if matchLabelValues(metric.values, lvs, curry) { return i } } return len(metrics) } func matchLabelValues(values, lvs []string, curry []curriedLabelValue) bool { if len(values) != len(lvs)+len(curry) { return false } var iLVs, iCurry int for i, v := range values { if iCurry < len(curry) && curry[iCurry].index == i { if v != curry[iCurry].value { return false } iCurry++ continue } if v != lvs[iLVs] { return false } iLVs++ } return true } // hashNew initializies a new fnv64a hash value. func hashNew() uint64 { return offset64 } // hashAdd adds a string to a fnv64a hash value, returning the updated hash. func hashAdd(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 } return h } // hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. func hashAddByte(h uint64, b byte) uint64 { h ^= uint64(b) h *= prime64 return h } func addFloat64(addr *float64, delta float64) { for { old := math.Float64frombits(atomic.LoadUint64((*uint64)(unsafe.Pointer(addr)))) newVal := old + delta if atomic.CompareAndSwapUint64( (*uint64)(unsafe.Pointer(addr)), math.Float64bits(old), math.Float64bits(newVal), ) { break } } }