aggregators/codec.go (361 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package aggregators // TODO(lahsivjar): Add a test using reflect to validate if all // fields are properly set. import ( "encoding/binary" "errors" "slices" "sort" "time" "github.com/axiomhq/hyperloglog" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram" "github.com/elastic/apm-aggregation/aggregators/nullable" "github.com/elastic/apm-data/model/modelpb" ) // CombinedMetricsKeyEncodedSize gives the encoded size gives the size of // CombinedMetricsKey in bytes. The size is used as follows: // - 2 bytes for interval encoding // - 8 bytes for timestamp encoding // - 16 bytes for ID encoding // - 2 bytes for partition ID const CombinedMetricsKeyEncodedSize = 28 // MarshalBinaryToSizedBuffer will marshal the combined metrics key into // its binary representation. The encoded byte slice will be used as a // key in pebbledb. To ensure efficient sorting and time range based // query, the first 2 bytes of the encoded slice is the aggregation // interval, the next 8 bytes of the encoded slice is the processing time // followed by combined metrics ID, the last 2 bytes is the partition ID. // The binary representation ensures that all entries are ordered by the // ID first and then ordered by the partition ID. func (k *CombinedMetricsKey) MarshalBinaryToSizedBuffer(data []byte) error { ivlSeconds := uint16(k.Interval.Seconds()) if len(data) != CombinedMetricsKeyEncodedSize { return errors.New("failed to marshal due to incorrect sized buffer") } var offset int binary.BigEndian.PutUint16(data[offset:], ivlSeconds) offset += 2 binary.BigEndian.PutUint64(data[offset:], uint64(k.ProcessingTime.Unix())) offset += 8 copy(data[offset:], k.ID[:]) offset += 16 binary.BigEndian.PutUint16(data[offset:], k.PartitionID) return nil } // UnmarshalBinary will convert the byte encoded data into CombinedMetricsKey. func (k *CombinedMetricsKey) UnmarshalBinary(data []byte) error { if len(data) < 12 { return errors.New("invalid encoded data of insufficient length") } var offset int k.Interval = time.Duration(binary.BigEndian.Uint16(data[offset:2])) * time.Second offset += 2 k.ProcessingTime = time.Unix(int64(binary.BigEndian.Uint64(data[offset:offset+8])), 0) offset += 8 copy(k.ID[:], data[offset:offset+len(k.ID)]) offset += len(k.ID) k.PartitionID = binary.BigEndian.Uint16(data[offset:]) return nil } // SizeBinary returns the size of the byte array required to encode // combined metrics key. Encoded size for CombinedMetricsKey is constant // and alternatively the constant CombinedMetricsKeyEncodedSize can be used. func (k *CombinedMetricsKey) SizeBinary() int { return CombinedMetricsKeyEncodedSize } // GetEncodedCombinedMetricsKeyWithoutPartitionID is a util function to // remove partition bits from an encoded CombinedMetricsKey. func GetEncodedCombinedMetricsKeyWithoutPartitionID(src []byte) []byte { var buf [CombinedMetricsKeyEncodedSize]byte copy(buf[:CombinedMetricsKeyEncodedSize-2], src) return buf[:] } // ToProto converts CombinedMetrics to its protobuf representation. func (m *combinedMetrics) ToProto() *aggregationpb.CombinedMetrics { var pb aggregationpb.CombinedMetrics pb.ServiceMetrics = slices.Grow(pb.ServiceMetrics, len(m.Services))[:len(m.Services)] var i int for k, m := range m.Services { if pb.ServiceMetrics[i] == nil { pb.ServiceMetrics[i] = &aggregationpb.KeyedServiceMetrics{} } pb.ServiceMetrics[i].Key = k.ToProto() pb.ServiceMetrics[i].Metrics = m.ToProto() i++ } if m.OverflowServicesEstimator != nil { pb.OverflowServices = m.OverflowServices.ToProto() pb.OverflowServicesEstimator = hllBytes(m.OverflowServicesEstimator) } pb.EventsTotal = m.EventsTotal pb.YoungestEventTimestamp = m.YoungestEventTimestamp return &pb } // ToProto converts ServiceAggregationKey to its protobuf representation. func (k *serviceAggregationKey) ToProto() *aggregationpb.ServiceAggregationKey { var pb aggregationpb.ServiceAggregationKey pb.Timestamp = modelpb.FromTime(k.Timestamp) pb.ServiceName = k.ServiceName pb.ServiceEnvironment = k.ServiceEnvironment pb.ServiceLanguageName = k.ServiceLanguageName pb.AgentName = k.AgentName pb.GlobalLabelsStr = []byte(k.GlobalLabelsStr) return &pb } // FromProto converts protobuf representation to ServiceAggregationKey. func (k *serviceAggregationKey) FromProto(pb *aggregationpb.ServiceAggregationKey) { k.Timestamp = modelpb.ToTime(pb.Timestamp) k.ServiceName = pb.ServiceName k.ServiceEnvironment = pb.ServiceEnvironment k.ServiceLanguageName = pb.ServiceLanguageName k.AgentName = pb.AgentName k.GlobalLabelsStr = string(pb.GlobalLabelsStr) } // ToProto converts ServiceMetrics to its protobuf representation. func (m *serviceMetrics) ToProto() *aggregationpb.ServiceMetrics { var pb aggregationpb.ServiceMetrics pb.OverflowGroups = m.OverflowGroups.ToProto() pb.TransactionMetrics = slices.Grow(pb.TransactionMetrics, len(m.TransactionGroups)) for _, m := range m.TransactionGroups { pb.TransactionMetrics = append(pb.TransactionMetrics, m) } pb.ServiceTransactionMetrics = slices.Grow(pb.ServiceTransactionMetrics, len(m.ServiceTransactionGroups)) for _, m := range m.ServiceTransactionGroups { pb.ServiceTransactionMetrics = append(pb.ServiceTransactionMetrics, m) } pb.SpanMetrics = slices.Grow(pb.SpanMetrics, len(m.SpanGroups)) for _, m := range m.SpanGroups { pb.SpanMetrics = append(pb.SpanMetrics, m) } return &pb } // ToProto converts TransactionAggregationKey to its protobuf representation. func (k *transactionAggregationKey) ToProto() *aggregationpb.TransactionAggregationKey { var pb aggregationpb.TransactionAggregationKey pb.TraceRoot = k.TraceRoot pb.ContainerId = k.ContainerID pb.KubernetesPodName = k.KubernetesPodName pb.ServiceVersion = k.ServiceVersion pb.ServiceNodeName = k.ServiceNodeName pb.ServiceRuntimeName = k.ServiceRuntimeName pb.ServiceRuntimeVersion = k.ServiceRuntimeVersion pb.ServiceLanguageVersion = k.ServiceLanguageVersion pb.HostHostname = k.HostHostname pb.HostName = k.HostName pb.HostOsPlatform = k.HostOSPlatform pb.EventOutcome = k.EventOutcome pb.TransactionName = k.TransactionName pb.TransactionType = k.TransactionType pb.TransactionResult = k.TransactionResult pb.FaasColdstart = uint32(k.FAASColdstart) pb.FaasId = k.FAASID pb.FaasName = k.FAASName pb.FaasVersion = k.FAASVersion pb.FaasTriggerType = k.FAASTriggerType pb.CloudProvider = k.CloudProvider pb.CloudRegion = k.CloudRegion pb.CloudAvailabilityZone = k.CloudAvailabilityZone pb.CloudServiceName = k.CloudServiceName pb.CloudAccountId = k.CloudAccountID pb.CloudAccountName = k.CloudAccountName pb.CloudMachineType = k.CloudMachineType pb.CloudProjectId = k.CloudProjectID pb.CloudProjectName = k.CloudProjectName return &pb } // FromProto converts protobuf representation to TransactionAggregationKey. func (k *transactionAggregationKey) FromProto(pb *aggregationpb.TransactionAggregationKey) { k.TraceRoot = pb.TraceRoot k.ContainerID = pb.ContainerId k.KubernetesPodName = pb.KubernetesPodName k.ServiceVersion = pb.ServiceVersion k.ServiceNodeName = pb.ServiceNodeName k.ServiceRuntimeName = pb.ServiceRuntimeName k.ServiceRuntimeVersion = pb.ServiceRuntimeVersion k.ServiceLanguageVersion = pb.ServiceLanguageVersion k.HostHostname = pb.HostHostname k.HostName = pb.HostName k.HostOSPlatform = pb.HostOsPlatform k.EventOutcome = pb.EventOutcome k.TransactionName = pb.TransactionName k.TransactionType = pb.TransactionType k.TransactionResult = pb.TransactionResult k.FAASColdstart = nullable.Bool(pb.FaasColdstart) k.FAASID = pb.FaasId k.FAASName = pb.FaasName k.FAASVersion = pb.FaasVersion k.FAASTriggerType = pb.FaasTriggerType k.CloudProvider = pb.CloudProvider k.CloudRegion = pb.CloudRegion k.CloudAvailabilityZone = pb.CloudAvailabilityZone k.CloudServiceName = pb.CloudServiceName k.CloudAccountID = pb.CloudAccountId k.CloudAccountName = pb.CloudAccountName k.CloudMachineType = pb.CloudMachineType k.CloudProjectID = pb.CloudProjectId k.CloudProjectName = pb.CloudProjectName } // ToProto converts ServiceTransactionAggregationKey to its protobuf representation. func (k *serviceTransactionAggregationKey) ToProto() *aggregationpb.ServiceTransactionAggregationKey { var pb aggregationpb.ServiceTransactionAggregationKey pb.TransactionType = k.TransactionType return &pb } // FromProto converts protobuf representation to ServiceTransactionAggregationKey. func (k *serviceTransactionAggregationKey) FromProto(pb *aggregationpb.ServiceTransactionAggregationKey) { k.TransactionType = pb.TransactionType } // ToProto converts SpanAggregationKey to its protobuf representation. func (k *spanAggregationKey) ToProto() *aggregationpb.SpanAggregationKey { var pb aggregationpb.SpanAggregationKey pb.SpanName = k.SpanName pb.Outcome = k.Outcome pb.TargetType = k.TargetType pb.TargetName = k.TargetName pb.Resource = k.Resource return &pb } // FromProto converts protobuf representation to SpanAggregationKey. func (k *spanAggregationKey) FromProto(pb *aggregationpb.SpanAggregationKey) { k.SpanName = pb.SpanName k.Outcome = pb.Outcome k.TargetType = pb.TargetType k.TargetName = pb.TargetName k.Resource = pb.Resource } // ToProto converts Overflow to its protobuf representation. func (o *overflow) ToProto() *aggregationpb.Overflow { var pb aggregationpb.Overflow if !o.OverflowTransaction.Empty() { pb.OverflowTransactions = o.OverflowTransaction.Metrics pb.OverflowTransactionsEstimator = hllBytes(o.OverflowTransaction.Estimator) } if !o.OverflowServiceTransaction.Empty() { pb.OverflowServiceTransactions = o.OverflowServiceTransaction.Metrics pb.OverflowServiceTransactionsEstimator = hllBytes(o.OverflowServiceTransaction.Estimator) } if !o.OverflowSpan.Empty() { pb.OverflowSpans = o.OverflowSpan.Metrics pb.OverflowSpansEstimator = hllBytes(o.OverflowSpan.Estimator) } return &pb } // FromProto converts protobuf representation to Overflow. func (o *overflow) FromProto(pb *aggregationpb.Overflow) { if pb.OverflowTransactions != nil { o.OverflowTransaction.Estimator = hllSketch(pb.OverflowTransactionsEstimator) o.OverflowTransaction.Metrics = pb.OverflowTransactions pb.OverflowTransactions = nil } if pb.OverflowServiceTransactions != nil { o.OverflowServiceTransaction.Estimator = hllSketch(pb.OverflowServiceTransactionsEstimator) o.OverflowServiceTransaction.Metrics = pb.OverflowServiceTransactions pb.OverflowServiceTransactions = nil } if pb.OverflowSpans != nil { o.OverflowSpan.Estimator = hllSketch(pb.OverflowSpansEstimator) o.OverflowSpan.Metrics = pb.OverflowSpans pb.OverflowSpans = nil } } // ToProto converts GlobalLabels to its protobuf representation. func (gl *globalLabels) ToProto() *aggregationpb.GlobalLabels { var pb aggregationpb.GlobalLabels // Keys must be sorted to ensure wire formats are deterministically generated and strings are directly comparable // i.e. Protobuf formats are equal if and only if the structs are equal pb.Labels = slices.Grow(pb.Labels, len(gl.Labels))[:len(gl.Labels)] var i int for k, v := range gl.Labels { if pb.Labels[i] == nil { pb.Labels[i] = &aggregationpb.Label{} } pb.Labels[i].Key = k pb.Labels[i].Value = v.Value pb.Labels[i].Values = slices.Grow(pb.Labels[i].Values, len(v.Values))[:len(v.Values)] copy(pb.Labels[i].Values, v.Values) i++ } sort.Slice(pb.Labels, func(i, j int) bool { return pb.Labels[i].Key < pb.Labels[j].Key }) pb.NumericLabels = slices.Grow(pb.NumericLabels, len(gl.NumericLabels))[:len(gl.NumericLabels)] i = 0 for k, v := range gl.NumericLabels { if pb.NumericLabels[i] == nil { pb.NumericLabels[i] = &aggregationpb.NumericLabel{} } pb.NumericLabels[i].Key = k pb.NumericLabels[i].Value = v.Value pb.NumericLabels[i].Values = slices.Grow(pb.NumericLabels[i].Values, len(v.Values))[:len(v.Values)] copy(pb.NumericLabels[i].Values, v.Values) i++ } sort.Slice(pb.NumericLabels, func(i, j int) bool { return pb.NumericLabels[i].Key < pb.NumericLabels[j].Key }) return &pb } // FromProto converts protobuf representation to globalLabels. func (gl *globalLabels) FromProto(pb *aggregationpb.GlobalLabels) { gl.Labels = make(modelpb.Labels, len(pb.Labels)) for _, l := range pb.Labels { gl.Labels[l.Key] = &modelpb.LabelValue{Value: l.Value, Global: true} gl.Labels[l.Key].Values = slices.Grow(gl.Labels[l.Key].Values, len(l.Values))[:len(l.Values)] copy(gl.Labels[l.Key].Values, l.Values) } gl.NumericLabels = make(modelpb.NumericLabels, len(pb.NumericLabels)) for _, l := range pb.NumericLabels { gl.NumericLabels[l.Key] = &modelpb.NumericLabelValue{Value: l.Value, Global: true} gl.NumericLabels[l.Key].Values = slices.Grow(gl.NumericLabels[l.Key].Values, len(l.Values))[:len(l.Values)] copy(gl.NumericLabels[l.Key].Values, l.Values) } } // MarshalBinary marshals globalLabels to binary using protobuf. func (gl *globalLabels) MarshalBinary() ([]byte, error) { if gl.Labels == nil && gl.NumericLabels == nil { return nil, nil } pb := gl.ToProto() return pb.MarshalVT() } // MarshalString marshals globalLabels to string from binary using protobuf. func (gl *globalLabels) MarshalString() (string, error) { b, err := gl.MarshalBinary() return string(b), err } // UnmarshalBinary unmarshals binary protobuf to globalLabels. func (gl *globalLabels) UnmarshalBinary(data []byte) error { if len(data) == 0 { gl.Labels = nil gl.NumericLabels = nil return nil } var pb aggregationpb.GlobalLabels if err := pb.UnmarshalVT(data); err != nil { return err } gl.FromProto(&pb) return nil } // UnmarshalString unmarshals string of binary protobuf to globalLabels. func (gl *globalLabels) UnmarshalString(data string) error { return gl.UnmarshalBinary([]byte(data)) } func histogramFromProto(h *hdrhistogram.HistogramRepresentation, pb *aggregationpb.HDRHistogram) { if pb == nil { return } h.LowestTrackableValue = pb.LowestTrackableValue h.HighestTrackableValue = pb.HighestTrackableValue h.SignificantFigures = pb.SignificantFigures h.CountsRep.Reset() for i := 0; i < len(pb.Buckets); i++ { h.CountsRep.Add(pb.Buckets[i], pb.Counts[i]) } } func histogramToProto(h *hdrhistogram.HistogramRepresentation) *aggregationpb.HDRHistogram { if h == nil { return nil } var pb aggregationpb.HDRHistogram setHistogramProto(h, &pb) return &pb } func setHistogramProto(h *hdrhistogram.HistogramRepresentation, pb *aggregationpb.HDRHistogram) { pb.LowestTrackableValue = h.LowestTrackableValue pb.HighestTrackableValue = h.HighestTrackableValue pb.SignificantFigures = h.SignificantFigures pb.Buckets = pb.Buckets[:0] pb.Counts = pb.Counts[:0] countsLen := h.CountsRep.Len() if countsLen > cap(pb.Buckets) { pb.Buckets = make([]int32, 0, countsLen) } if countsLen > cap(pb.Counts) { pb.Counts = make([]int64, 0, countsLen) } h.CountsRep.ForEach(func(bucket int32, count int64) { pb.Buckets = append(pb.Buckets, bucket) pb.Counts = append(pb.Counts, count) }) } func hllBytes(estimator *hyperloglog.Sketch) []byte { if estimator == nil { return nil } // Ignoring error here since error will always be nil b, _ := estimator.MarshalBinary() return b } // hllSketchEstimate returns hllSketch(estimator).Estimate() if estimator is // non-nil, and zero if estimator is nil. func hllSketchEstimate(estimator []byte) uint64 { if sketch := hllSketch(estimator); sketch != nil { return sketch.Estimate() } return 0 } func hllSketch(estimator []byte) *hyperloglog.Sketch { if len(estimator) == 0 { return nil } var sketch hyperloglog.Sketch // Ignoring returned error here since the error is only returned if // the precision is set outside bounds which is not possible for our case. sketch.UnmarshalBinary(estimator) return &sketch }