pkg/export/pool.go (119 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package export import ( "fmt" "hash/fnv" "sync" monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/prometheus/client_golang/prometheus" ) var ( poolIntern = prometheus.NewCounter( prometheus.CounterOpts{ Name: "gcm_pool_intern_total", Help: "Time series memory intern operations.", }, ) poolRelease = prometheus.NewCounter( prometheus.CounterOpts{ Name: "gcm_pool_release_total", Help: "Time series memory intern release operations.", }, ) ) // pool holds interned strings and label sets to deduplicate memory across // cached monitoring_pb.TimeSeries entries. type pool struct { mtx sync.Mutex strings map[string]*stringEntry labels map[uint64]*labelsEntry } func newPool(reg prometheus.Registerer) *pool { if reg != nil { reg.MustRegister(poolIntern, poolRelease) } return &pool{ strings: map[string]*stringEntry{}, labels: map[uint64]*labelsEntry{}, } } type stringEntry struct { refs uint32 s string } type labelsEntry struct { refs uint32 labels map[string]string } // intern the strings and label sets in ts. func (p *pool) intern(ts *monitoring_pb.TimeSeries) { if ts == nil { return } p.mtx.Lock() defer p.mtx.Unlock() poolIntern.Inc() ts.Resource.Labels = p.internLabels(ts.Resource.Labels) ts.Metric.Type = p.internString(ts.Metric.Type) ts.Metric.Labels = p.internLabels(ts.Metric.Labels) } func (p *pool) internString(s string) string { e, ok := p.strings[s] if !ok { e = &stringEntry{s: s} p.strings[e.s] = e } e.refs++ return e.s } func (p *pool) labelSum(lset map[string]string) uint64 { h := fnv.New64a() hashLabels(h, lset) return h.Sum64() } func (p *pool) internLabels(lset map[string]string) map[string]string { hsum := p.labelSum(lset) e, ok := p.labels[hsum] if !ok { for k, v := range lset { delete(lset, k) // In general Prometheus already optimizes label string allocations // and interning them won't do much in practice. But the overhead of doing it // is negligible and it decouples ous from Prometheus' implementation details // to be more robust to future changes. lset[p.internString(k)] = p.internString(v) } e = &labelsEntry{labels: lset} p.labels[hsum] = e } e.refs++ return e.labels } // release pooled memory interned for ts. func (p *pool) release(ts *monitoring_pb.TimeSeries) { // When first populating a cache entry, we may call this with an unset series. if ts == nil { return } p.mtx.Lock() defer p.mtx.Unlock() poolRelease.Inc() p.releaseLabels(ts.Resource.Labels) p.releaseString(ts.Metric.Type) p.releaseLabels(ts.Metric.Labels) } func (p *pool) releaseString(s string) { e, ok := p.strings[s] if !ok { panic(fmt.Sprintf("release of non-interned string %q", s)) } e.refs-- if e.refs == 0 { delete(p.strings, s) } } func (p *pool) releaseLabels(lset map[string]string) { hsum := p.labelSum(lset) e, ok := p.labels[hsum] if !ok { panic(fmt.Sprintf("release of non-interned labels %s", lset)) } e.refs-- if e.refs == 0 { for k, v := range e.labels { p.releaseString(k) p.releaseString(v) } delete(p.labels, hsum) } }