processor/lsmintervalprocessor/internal/merger/limits/tracker.go (251 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package limits // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger/limits"
import (
"encoding/binary"
"errors"
"fmt"
"hash"
"slices"
"github.com/axiomhq/hyperloglog"
)
// Tracker tracks the configured limits while merging. It records the
// observed count as well as the unique overflow counts.
type Tracker struct {
maxCardinality uint64
// Note that overflow buckets will NOT be counted in observed count
// though, overflow buckets can have overflow of their own.
observedCount uint64
overflowCounts *hyperloglog.Sketch
}
func newTracker(maxCardinality uint64) *Tracker {
return &Tracker{maxCardinality: maxCardinality}
}
func (t *Tracker) Equal(other *Tracker) bool {
if t.maxCardinality != other.maxCardinality {
return false
}
if t.observedCount != other.observedCount {
return false
}
return t.EstimateOverflow() == other.EstimateOverflow()
}
func (t *Tracker) HasOverflow() bool {
return t.overflowCounts != nil
}
func (t *Tracker) EstimateOverflow() uint64 {
if t.overflowCounts == nil {
return 0
}
return t.overflowCounts.Estimate()
}
// CheckOverflow checks if overflow will happen on addition of a new entry with
// the provided hash denoting the entries ID. It assumes that any entry passed
// to this method is a NEW entry and the check for this is left to the caller.
func (t *Tracker) CheckOverflow(f func() hash.Hash64) bool {
if t.maxCardinality == 0 {
return false
}
if t.observedCount == t.maxCardinality {
if t.overflowCounts == nil {
// Creates an overflow with 14 precision
t.overflowCounts = hyperloglog.New14()
}
t.overflowCounts.InsertHash(f().Sum64())
return true
}
t.observedCount++
return false
}
// MergeEstimators merges the overflow estimators for the two trackers.
// Note that other required maintenance of the tracker for merge needs to
// done by the caller.
func (t *Tracker) MergeEstimators(other *Tracker) error {
if other.overflowCounts == nil {
// nothing to merge
return nil
}
if t.overflowCounts == nil {
t.overflowCounts = other.overflowCounts.Clone()
return nil
}
return t.overflowCounts.Merge(other.overflowCounts)
}
// AppendBinary marshals the tracker and appends the result to b.
func (t *Tracker) AppendBinary(b []byte) ([]byte, error) {
b = binary.AppendUvarint(b, t.observedCount)
// Make space for the sketch length. We reserve 2 bytes, which is sufficient
// for storing the length of a precision 14 sketch.
lenOffset := len(b)
b = append(b, 0, 0)
if t.overflowCounts != nil {
var err error
b, err = t.overflowCounts.AppendBinary(b)
if err != nil {
return nil, fmt.Errorf("failed to marshal limits: %w", err)
}
sketchLength := len(b) - lenOffset - 2
binary.BigEndian.PutUint16(b[lenOffset:lenOffset+2], uint16(sketchLength))
}
return b, nil
}
// Unmarshal unmarshals the encoded limits into t, and returns the number of
// bytes consumed.
//
// Example usage:
//
// var t Tracker
// n, err := t.Unmarshal(data)
// if err != nil {
// panic(err)
// }
// data = data[n:]
func (t *Tracker) Unmarshal(d []byte) (int, error) {
observedCount, n := binary.Uvarint(d)
if n <= 0 {
return 0, errors.New("failed to unmarshal tracker, invalid length")
}
t.observedCount = observedCount
d = d[n:]
if len(d) < 2 {
return 0, errors.New("failed to unmarshal tracker, invalid length")
}
sketchLength := int(binary.BigEndian.Uint16(d[:2]))
d = d[2:]
if sketchLength > 0 {
if len(d) < sketchLength {
return 0, errors.New("failed to unmarshal tracker, invalid length")
}
t.overflowCounts = hyperloglog.New14()
if err := t.overflowCounts.UnmarshalBinary(d[:sketchLength]); err != nil {
return 0, fmt.Errorf("failed to unmarshal tracker: %w", err)
}
}
return n + 2 + sketchLength, nil
}
// ScopeTracker tracks cardinality for scope metrics. They have a nested
// structure to track cardinality for each metrics for the scope and datapoints
// for the metrics.
type ScopeTracker struct {
*Tracker
metricLimit uint64
datapointLimit uint64
metrics []*MetricTracker
}
// NewMetricTracker creates new metric trackers to track metrics cardinality
// for the metrics within the current scope.
func (st *ScopeTracker) NewMetricTracker() *MetricTracker {
mt := &MetricTracker{
Tracker: newTracker(st.metricLimit),
datapointLimit: st.datapointLimit,
}
st.metrics = append(st.metrics, mt)
return mt
}
// GetMetricTracker returns a metric tracker from the index of the metric in
// the `pmetric.ScopeMetrics` slice of the `pmetric.Metrics` model.
func (st *ScopeTracker) GetMetricTracker(i int) *MetricTracker {
if i >= len(st.metrics) {
return nil
}
return st.metrics[i]
}
// MetricTracker tracks cardinality for metrics. They have a nested structure
// to track the cardinality of each datapoint for the metrics.
type MetricTracker struct {
*Tracker
datapointLimit uint64
datapoints []*Tracker
}
// NewDatapointTracker creates a new datapoint tracker to track datapoint
// cardinality for the datapoints within the current metrics.
func (mt *MetricTracker) NewDatapointTracker() *Tracker {
tracker := newTracker(mt.datapointLimit)
mt.datapoints = append(mt.datapoints, tracker)
return tracker
}
// GetDatapointTracker returns a datapoint tracker from the index of the
// datapoint in the `pmetric.Metrics` slice of the `pmetric.Metrics` model.
func (mt *MetricTracker) GetDatapointTracker(i int) *Tracker {
if i >= len(mt.datapoints) {
return nil
}
return mt.datapoints[i]
}
type trackerType uint8
const (
resourceTrackerType trackerType = iota
scopeTrackerType
metricTrackerType
dpTrackerType
)
// Trackers represent multiple tracker in an ordered structure. It takes advantage
// of the fact that pmetric DS is ordered and thus allows trackers to be created
// for each resource, scope, and datapoint independent of the pmetric datastructure.
// Note that this means that the order for pmetric and trackers are implicitly
// related and removing/adding new objects to pmetric should be accompanied by
// adding a corresponding tracker. The different types of trackers are:
//
// - Resource tracker: one for each `pmetric.Metrics`, tracks the cardinality of
// resources as per the configured limit.
// - Scope tracker: one for each `pmetric.ResourceMetrics`, tracks the cardinality
// of scopes within a resource as per the configured limit.
// - Metric tracker: one for each `pmetric.ScopeMetrics`, tracks the cardinality of
// metrics within a scope as per the configured limit.
// - Datapoint tracker: one for each `pmetric.Metric`, tracks the cardinality of
// datapoints within a metric as per the configured limit.
type Trackers struct {
resourceLimit uint64
scopeLimit uint64
metricLimit uint64
datapointLimit uint64
resource *Tracker
scope []*ScopeTracker
}
// NewTrackers creates trackers based on the configured overflow limits.
func NewTrackers(resourceLimit, scopeLimit, metricLimit, datapointLimit uint64) *Trackers {
return &Trackers{
resourceLimit: resourceLimit,
scopeLimit: scopeLimit,
metricLimit: metricLimit,
datapointLimit: datapointLimit,
// Create a resource tracker preemptively whenever a tracker is created
resource: newTracker(resourceLimit),
}
}
// GetResourceTracker returns the resource tracker.
func (t *Trackers) GetResourceTracker() *Tracker {
return t.resource
}
// GetScopeTracker returns the scope tracker based on the index of the resouce metrics
// whose scopes are to be tracked in the `pmetric.ResourceMetrics` slice of the
// `pmetric.Metrics` datamodel for the resource whose scopes are being tracked.
func (t *Trackers) GetScopeTracker(i int) *ScopeTracker {
if i >= len(t.scope) {
return nil
}
return t.scope[i]
}
func (t *Trackers) NewScopeTracker() *ScopeTracker {
scopeTracker := &ScopeTracker{
Tracker: newTracker(t.scopeLimit),
metricLimit: t.metricLimit,
datapointLimit: t.datapointLimit,
}
t.scope = append(t.scope, scopeTracker)
return scopeTracker
}
func (t *Trackers) AppendBinary(b []byte) ([]byte, error) {
if t == nil || t.resource == nil {
// if trackers is nil then nothing to marshal
return b, nil
}
n := 1 + len(t.scope)
for _, st := range t.scope {
n += len(st.metrics)
for _, mt := range st.metrics {
n += len(mt.datapoints)
}
}
// minimum 4 bytes per tracker (type=1, count=1+, sketch length=2)
b = slices.Grow(b, n*4)
b, err := marshalTracker(resourceTrackerType, t.resource, b)
if err != nil {
return nil, err
}
for _, st := range t.scope {
b, err = marshalTracker(scopeTrackerType, st.Tracker, b)
if err != nil {
return nil, err
}
for _, mt := range st.metrics {
b, err = marshalTracker(metricTrackerType, mt.Tracker, b)
if err != nil {
return nil, err
}
for _, dpt := range mt.datapoints {
b, err = marshalTracker(dpTrackerType, dpt, b)
if err != nil {
return nil, err
}
}
}
}
return b, nil
}
func (t *Trackers) Unmarshal(d []byte) error {
if len(d) == 0 {
return nil
}
var (
offset int
latestScopeTracker *ScopeTracker
latestMetricTracker *MetricTracker
)
for offset < len(d) {
trackerTyp := trackerType(d[offset])
offset += 1
// The below code will panic with NPE if the binary encoding is
// unexpected. The expected binary encoding must have one resource
// tracker then scope tracker followed by metric tracker followed by
// datapoint tracker.
var tracker *Tracker
switch trackerTyp {
case resourceTrackerType:
tracker = t.GetResourceTracker()
case scopeTrackerType:
latestScopeTracker = t.NewScopeTracker()
tracker = latestScopeTracker.Tracker
// Nil the previous metric tracker as we expect a new metric
// tracker for the new scope.
latestMetricTracker = nil
case metricTrackerType:
latestMetricTracker = latestScopeTracker.NewMetricTracker()
tracker = latestMetricTracker.Tracker
case dpTrackerType:
tracker = latestMetricTracker.NewDatapointTracker()
default:
return errors.New("invalid tracker found")
}
n, err := tracker.Unmarshal(d[offset:])
if err != nil {
return err
}
offset += n
}
return nil
}
func marshalTracker(typ trackerType, tracker *Tracker, result []byte) ([]byte, error) {
result = append(result, byte(typ))
return tracker.AppendBinary(result)
}