exporter/metric/metric.go (725 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric import ( "bytes" "context" "encoding/hex" "errors" "fmt" "math" "net/url" "reflect" "sort" "strings" "sync" "time" "unicode" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/trace" monitoring "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/genproto/googleapis/api/distribution" "google.golang.org/genproto/googleapis/api/label" googlemetricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping" ) const ( // The number of timeserieses to send to GCM in a single request. This // is a hard limit in the GCM API, so we never want to exceed 200. sendBatchSize = 200 cloudMonitoringMetricDescriptorNameFormat = "workload.googleapis.com/%s" platformMappingMonitoredResourceKey = "gcp.resource_type" ) // key is used to judge the uniqueness of the record descriptor. type key struct { name string libraryname string } func keyOf(metrics metricdata.Metrics, library instrumentation.Scope) key { return key{ name: metrics.Name, libraryname: library.Name, } } // metricExporter is the implementation of OpenTelemetry metric exporter for // Google Cloud Monitoring. type metricExporter struct { o *options shutdown chan struct{} // mdCache is the cache to hold MetricDescriptor to avoid creating duplicate MD. mdCache map[key]*googlemetricpb.MetricDescriptor client *monitoring.MetricClient mdLock sync.RWMutex shutdownOnce sync.Once } // ForceFlush does nothing, the exporter holds no state. func (e *metricExporter) ForceFlush(ctx context.Context) error { return ctx.Err() } // Shutdown shuts down the client connections. func (e *metricExporter) Shutdown(ctx context.Context) error { err := errShutdown e.shutdownOnce.Do(func() { close(e.shutdown) err = errors.Join(ctx.Err(), e.client.Close()) }) return err } // newMetricExporter returns an exporter that uploads OTel metric data to Google Cloud Monitoring. func newMetricExporter(o *options) (*metricExporter, error) { if strings.TrimSpace(o.projectID) == "" { return nil, errBlankProjectID } clientOpts := append([]option.ClientOption{option.WithGRPCDialOption(grpc.WithUserAgent(userAgent))}, o.monitoringClientOptions...) ctx := o.context if ctx == nil { ctx = context.Background() } client, err := monitoring.NewMetricClient(ctx, clientOpts...) if err != nil { return nil, err } if o.compression == "gzip" { client.CallOptions.GetMetricDescriptor = append(client.CallOptions.GetMetricDescriptor, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) client.CallOptions.CreateMetricDescriptor = append(client.CallOptions.CreateMetricDescriptor, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) client.CallOptions.CreateTimeSeries = append(client.CallOptions.CreateTimeSeries, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) client.CallOptions.CreateServiceTimeSeries = append(client.CallOptions.CreateServiceTimeSeries, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) } cache := map[key]*googlemetricpb.MetricDescriptor{} e := &metricExporter{ o: o, mdCache: cache, client: client, shutdown: make(chan struct{}), } return e, nil } var errShutdown = fmt.Errorf("exporter is shutdown") // Export exports OpenTelemetry Metrics to Google Cloud Monitoring. func (me *metricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { case <-me.shutdown: return errShutdown default: } if me.o.destinationProjectQuota { ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-goog-user-project": strings.TrimPrefix(me.o.projectID, "projects/")})) } return errors.Join( me.exportMetricDescriptor(ctx, rm), me.exportTimeSeries(ctx, rm), ) } // Temporality returns the Temporality to use for an instrument kind. func (me *metricExporter) Temporality(ik metric.InstrumentKind) metricdata.Temporality { return metric.DefaultTemporalitySelector(ik) } // Aggregation returns the Aggregation to use for an instrument kind. func (me *metricExporter) Aggregation(ik metric.InstrumentKind) metric.Aggregation { return metric.DefaultAggregationSelector(ik) } // exportMetricDescriptor create MetricDescriptor from the record // if the descriptor is not registered in Cloud Monitoring yet. func (me *metricExporter) exportMetricDescriptor(ctx context.Context, rm *metricdata.ResourceMetrics) error { // We only send metric descriptors if we're configured *and* we're not sending service timeseries. if me.o.disableCreateMetricDescriptors { return nil } me.mdLock.Lock() defer me.mdLock.Unlock() mds := make(map[key]*googlemetricpb.MetricDescriptor) extraLabels := me.extraLabelsFromResource(rm.Resource) for _, scope := range rm.ScopeMetrics { for _, metrics := range scope.Metrics { k := keyOf(metrics, scope.Scope) if _, ok := me.mdCache[k]; ok { continue } if _, localok := mds[k]; !localok { md := me.recordToMdpb(metrics, extraLabels) mds[k] = md } } } // TODO: This process is synchronous and blocks longer time if records in cps // have many different descriptors. In the cps.ForEach above, it should spawn // goroutines to send CreateMetricDescriptorRequest asynchronously in the case // the descriptor does not exist in global cache (me.mdCache). // See details in #26. var errs []error for kmd, md := range mds { err := me.createMetricDescriptorIfNeeded(ctx, md) if err == nil { me.mdCache[kmd] = md } errs = append(errs, err) } return errors.Join(errs...) } func (me *metricExporter) createMetricDescriptorIfNeeded(ctx context.Context, md *googlemetricpb.MetricDescriptor) error { mdReq := &monitoringpb.GetMetricDescriptorRequest{ Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", me.o.projectID, md.Type), } _, err := me.client.GetMetricDescriptor(ctx, mdReq) if err == nil { // If the metric descriptor already exists, skip the CreateMetricDescriptor call. // Metric descriptors cannot be updated without deleting them first, so there // isn't anything we can do here: // https://cloud.google.com/monitoring/custom-metrics/creating-metrics#md-modify return nil } req := &monitoringpb.CreateMetricDescriptorRequest{ Name: fmt.Sprintf("projects/%s", me.o.projectID), MetricDescriptor: md, } _, err = me.client.CreateMetricDescriptor(ctx, req) return err } // exportTimeSeries create TimeSeries from the records in cps. // res should be the common resource among all TimeSeries, such as instance id, application name and so on. func (me *metricExporter) exportTimeSeries(ctx context.Context, rm *metricdata.ResourceMetrics) error { tss, err := me.recordsToTspbs(rm) if len(tss) == 0 { return err } name := fmt.Sprintf("projects/%s", me.o.projectID) errs := []error{err} for i := 0; i < len(tss); i += sendBatchSize { j := i + sendBatchSize if j >= len(tss) { j = len(tss) } // TODO: When this exporter is rewritten, support writing to multiple // projects based on the "gcp.project.id" resource. req := &monitoringpb.CreateTimeSeriesRequest{ Name: name, TimeSeries: tss[i:j], } if me.o.createServiceTimeSeries { errs = append(errs, me.client.CreateServiceTimeSeries(ctx, req)) } else { errs = append(errs, me.client.CreateTimeSeries(ctx, req)) } } return errors.Join(errs...) } func (me *metricExporter) extraLabelsFromResource(res *resource.Resource) *attribute.Set { set, _ := attribute.NewSetWithFiltered(res.Attributes(), me.o.resourceAttributeFilter) return &set } // descToMetricType converts descriptor to MetricType proto type. // Basically this returns default value ("workload.googleapis.com/[metric type]"). func (me *metricExporter) descToMetricType(desc metricdata.Metrics) string { if formatter := me.o.metricDescriptorTypeFormatter; formatter != nil { return formatter(desc) } return fmt.Sprintf(cloudMonitoringMetricDescriptorNameFormat, desc.Name) } // metricTypeToDisplayName takes a GCM metric type, like (workload.googleapis.com/MyCoolMetric) and returns the display name. func metricTypeToDisplayName(mURL string) string { // strip domain, keep path after domain. u, err := url.Parse(fmt.Sprintf("metrics://%s", mURL)) if err != nil || u.Path == "" { return mURL } return strings.TrimLeft(u.Path, "/") } // recordToMdpb extracts data and converts them to googlemetricpb.MetricDescriptor. func (me *metricExporter) recordToMdpb(metrics metricdata.Metrics, extraLabels *attribute.Set) *googlemetricpb.MetricDescriptor { name := metrics.Name typ := me.descToMetricType(metrics) kind, valueType := recordToMdpbKindType(metrics.Data) // Detailed explanations on MetricDescriptor proto is not documented on // generated Go packages. Refer to the original proto file. // https://github.com/googleapis/googleapis/blob/50af053/google/api/metric.proto#L33 return &googlemetricpb.MetricDescriptor{ Name: name, DisplayName: metricTypeToDisplayName(typ), Type: typ, MetricKind: kind, ValueType: valueType, Unit: string(metrics.Unit), Description: metrics.Description, Labels: labelDescriptors(metrics, extraLabels), } } func labelDescriptors(metrics metricdata.Metrics, extraLabels *attribute.Set) []*label.LabelDescriptor { labels := []*label.LabelDescriptor{} seenKeys := map[string]struct{}{} addAttributes := func(attr *attribute.Set) { iter := attr.Iter() for iter.Next() { kv := iter.Attribute() // Skip keys that have already been set if _, ok := seenKeys[normalizeLabelKey(string(kv.Key))]; ok { continue } labels = append(labels, &label.LabelDescriptor{ Key: normalizeLabelKey(string(kv.Key)), }) seenKeys[normalizeLabelKey(string(kv.Key))] = struct{}{} } } addAttributes(extraLabels) switch a := metrics.Data.(type) { case metricdata.Gauge[int64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } case metricdata.Gauge[float64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } case metricdata.Sum[int64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } case metricdata.Sum[float64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } case metricdata.Histogram[float64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } case metricdata.Histogram[int64]: for _, pt := range a.DataPoints { addAttributes(&pt.Attributes) } } return labels } type attributes struct { attrs attribute.Set } func (attrs *attributes) GetString(key string) (string, bool) { value, ok := attrs.attrs.Value(attribute.Key(key)) return value.AsString(), ok } // resourceToMonitoredResourcepb converts resource in OTel to MonitoredResource // proto type for Cloud Monitoring. // // https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.monitoredResourceDescriptors func (me *metricExporter) resourceToMonitoredResourcepb(res *resource.Resource) *monitoredrespb.MonitoredResource { platformMrType, platformMappingRequested := res.Set().Value(platformMappingMonitoredResourceKey) // check if platform mapping is requested and possible if platformMappingRequested && platformMrType.AsString() == me.o.monitoredResourceDescription.mrType { // assemble attributes required to construct this MR attributeMap := make(map[string]string) for expectedLabel := range me.o.monitoredResourceDescription.mrLabels { value, found := res.Set().Value(attribute.Key(expectedLabel)) if found { attributeMap[expectedLabel] = value.AsString() } } return &monitoredrespb.MonitoredResource{ Type: platformMrType.AsString(), Labels: attributeMap, } } gmr := resourcemapping.ResourceAttributesToMonitoringMonitoredResource(&attributes{ attrs: attribute.NewSet(res.Attributes()...), }) newLabels := make(map[string]string, len(gmr.Labels)) for k, v := range gmr.Labels { newLabels[k] = sanitizeUTF8(v) } mr := &monitoredrespb.MonitoredResource{ Type: gmr.Type, Labels: newLabels, } return mr } // recordToMdpbKindType return the mapping from OTel's record descriptor to // Cloud Monitoring's MetricKind and ValueType. func recordToMdpbKindType(a metricdata.Aggregation) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) { switch agg := a.(type) { case metricdata.Gauge[int64]: return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64 case metricdata.Gauge[float64]: return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE case metricdata.Sum[int64]: if agg.IsMonotonic { return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_INT64 } return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64 case metricdata.Sum[float64]: if agg.IsMonotonic { return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DOUBLE } return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE case metricdata.Histogram[int64], metricdata.Histogram[float64]: return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DISTRIBUTION default: return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED } } // recordToMpb converts data from records to Metric proto type for Cloud Monitoring. func (me *metricExporter) recordToMpb(metrics metricdata.Metrics, attributes attribute.Set, library instrumentation.Scope, extraLabels *attribute.Set) *googlemetricpb.Metric { me.mdLock.RLock() defer me.mdLock.RUnlock() k := keyOf(metrics, library) md, ok := me.mdCache[k] if !ok { md = me.recordToMdpb(metrics, extraLabels) } labels := make(map[string]string) addAttributes := func(attr *attribute.Set) { iter := attr.Iter() for iter.Next() { kv := iter.Attribute() labels[normalizeLabelKey(string(kv.Key))] = sanitizeUTF8(kv.Value.Emit()) } } addAttributes(extraLabels) addAttributes(&attributes) return &googlemetricpb.Metric{ Type: md.Type, Labels: labels, } } // recordToTspb converts record to TimeSeries proto type with common resource. // ref. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries func (me *metricExporter) recordToTspb(m metricdata.Metrics, mr *monitoredrespb.MonitoredResource, library instrumentation.Scope, extraLabels *attribute.Set) ([]*monitoringpb.TimeSeries, error) { var tss []*monitoringpb.TimeSeries var errs []error if m.Data == nil { return nil, nil } switch a := m.Data.(type) { case metricdata.Gauge[int64]: for _, point := range a.DataPoints { ts, err := gaugeToTimeSeries[int64](point, m, mr) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.Gauge[float64]: for _, point := range a.DataPoints { ts, err := gaugeToTimeSeries[float64](point, m, mr) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.Sum[int64]: for _, point := range a.DataPoints { var ts *monitoringpb.TimeSeries var err error if a.IsMonotonic { ts, err = sumToTimeSeries[int64](point, m, mr) } else { // Send non-monotonic sums as gauges ts, err = gaugeToTimeSeries[int64](point, m, mr) } if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.Sum[float64]: for _, point := range a.DataPoints { var ts *monitoringpb.TimeSeries var err error if a.IsMonotonic { ts, err = sumToTimeSeries[float64](point, m, mr) } else { // Send non-monotonic sums as gauges ts, err = gaugeToTimeSeries[float64](point, m, mr) } if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.Histogram[int64]: for _, point := range a.DataPoints { ts, err := histogramToTimeSeries(point, m, mr, me.o.enableSumOfSquaredDeviation, me.o.projectID) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.Histogram[float64]: for _, point := range a.DataPoints { ts, err := histogramToTimeSeries(point, m, mr, me.o.enableSumOfSquaredDeviation, me.o.projectID) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.ExponentialHistogram[int64]: for _, point := range a.DataPoints { ts, err := expHistogramToTimeSeries(point, m, mr, me.o.enableSumOfSquaredDeviation, me.o.projectID) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } case metricdata.ExponentialHistogram[float64]: for _, point := range a.DataPoints { ts, err := expHistogramToTimeSeries(point, m, mr, me.o.enableSumOfSquaredDeviation, me.o.projectID) if err != nil { errs = append(errs, err) continue } ts.Metric = me.recordToMpb(m, point.Attributes, library, extraLabels) tss = append(tss, ts) } default: errs = append(errs, errUnexpectedAggregationKind{kind: reflect.TypeOf(m.Data).String()}) } return tss, errors.Join(errs...) } func (me *metricExporter) recordsToTspbs(rm *metricdata.ResourceMetrics) ([]*monitoringpb.TimeSeries, error) { mr := me.resourceToMonitoredResourcepb(rm.Resource) extraLabels := me.extraLabelsFromResource(rm.Resource) var ( tss []*monitoringpb.TimeSeries errs []error ) for _, scope := range rm.ScopeMetrics { for _, metrics := range scope.Metrics { ts, err := me.recordToTspb(metrics, mr, scope.Scope, extraLabels) errs = append(errs, err) tss = append(tss, ts...) } } return tss, errors.Join(errs...) } func sanitizeUTF8(s string) string { return strings.ToValidUTF8(s, "�") } func gaugeToTimeSeries[N int64 | float64](point metricdata.DataPoint[N], metrics metricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { value, valueType := numberDataPointToValue(point) timestamp := timestamppb.New(point.Time) if err := timestamp.CheckValid(); err != nil { return nil, err } return &monitoringpb.TimeSeries{ Resource: mr, Unit: string(metrics.Unit), MetricKind: googlemetricpb.MetricDescriptor_GAUGE, ValueType: valueType, Points: []*monitoringpb.Point{{ Interval: &monitoringpb.TimeInterval{ EndTime: timestamp, }, Value: value, }}, }, nil } func sumToTimeSeries[N int64 | float64](point metricdata.DataPoint[N], metrics metricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) if err != nil { return nil, err } value, valueType := numberDataPointToValue[N](point) return &monitoringpb.TimeSeries{ Resource: mr, Unit: string(metrics.Unit), MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, ValueType: valueType, Points: []*monitoringpb.Point{{ Interval: interval, Value: value, }}, }, nil } // TODO(@dashpole): Refactor to pass control-coupling lint check. // //nolint:revive func histogramToTimeSeries[N int64 | float64](point metricdata.HistogramDataPoint[N], metrics metricdata.Metrics, mr *monitoredrespb.MonitoredResource, enableSOSD bool, projectID string) (*monitoringpb.TimeSeries, error) { interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) if err != nil { return nil, err } distributionValue := histToDistribution(point, projectID) if enableSOSD { setSumOfSquaredDeviation(point, distributionValue) } return &monitoringpb.TimeSeries{ Resource: mr, Unit: string(metrics.Unit), MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, ValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, Points: []*monitoringpb.Point{{ Interval: interval, Value: &monitoringpb.TypedValue{ Value: &monitoringpb.TypedValue_DistributionValue{ DistributionValue: distributionValue, }, }, }}, }, nil } func expHistogramToTimeSeries[N int64 | float64](point metricdata.ExponentialHistogramDataPoint[N], metrics metricdata.Metrics, mr *monitoredrespb.MonitoredResource, enableSOSD bool, projectID string) (*monitoringpb.TimeSeries, error) { interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) if err != nil { return nil, err } distributionValue := expHistToDistribution(point, projectID) // TODO: Implement "setSumOfSquaredDeviationExpHist" for parameter "enableSOSD" functionality. return &monitoringpb.TimeSeries{ Resource: mr, Unit: string(metrics.Unit), MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, ValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, Points: []*monitoringpb.Point{{ Interval: interval, Value: &monitoringpb.TypedValue{ Value: &monitoringpb.TypedValue_DistributionValue{ DistributionValue: distributionValue, }, }, }}, }, nil } func toNonemptyTimeIntervalpb(start, end time.Time) (*monitoringpb.TimeInterval, error) { // The end time of a new interval must be at least a millisecond after the end time of the // previous interval, for all non-gauge types. // https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#timeinterval if end.Sub(start).Milliseconds() <= 1 { end = start.Add(time.Millisecond) } startpb := timestamppb.New(start) endpb := timestamppb.New(end) err := errors.Join( startpb.CheckValid(), endpb.CheckValid(), ) if err != nil { return nil, err } return &monitoringpb.TimeInterval{ StartTime: startpb, EndTime: endpb, }, nil } func histToDistribution[N int64 | float64](hist metricdata.HistogramDataPoint[N], projectID string) *distribution.Distribution { counts := make([]int64, len(hist.BucketCounts)) for i, v := range hist.BucketCounts { counts[i] = int64(v) } var mean float64 if !math.IsNaN(float64(hist.Sum)) && hist.Count > 0 { // Avoid divide-by-zero mean = float64(hist.Sum) / float64(hist.Count) } return &distribution.Distribution{ Count: int64(hist.Count), Mean: mean, BucketCounts: counts, BucketOptions: &distribution.Distribution_BucketOptions{ Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ Bounds: hist.Bounds, }, }, }, Exemplars: toDistributionExemplar[N](hist.Exemplars, projectID), } } func expHistToDistribution[N int64 | float64](hist metricdata.ExponentialHistogramDataPoint[N], projectID string) *distribution.Distribution { // First calculate underflow bucket with all negatives + zeros. underflow := hist.ZeroCount negativeBuckets := hist.NegativeBucket.Counts for i := 0; i < len(negativeBuckets); i++ { underflow += negativeBuckets[i] } // Next, pull in remaining buckets. counts := make([]int64, len(hist.PositiveBucket.Counts)+2) bucketOptions := &distribution.Distribution_BucketOptions{} counts[0] = int64(underflow) positiveBuckets := hist.PositiveBucket.Counts for i := 0; i < len(positiveBuckets); i++ { counts[i+1] = int64(positiveBuckets[i]) } // Overflow bucket is always empty counts[len(counts)-1] = 0 if len(hist.PositiveBucket.Counts) == 0 { // We cannot send exponential distributions with no positive buckets, // instead we send a simple overflow/underflow histogram. bucketOptions.Options = &distribution.Distribution_BucketOptions_ExplicitBuckets{ ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ Bounds: []float64{0}, }, } } else { // Exponential histogram growth := math.Exp2(math.Exp2(-float64(hist.Scale))) scale := math.Pow(growth, float64(hist.PositiveBucket.Offset)) bucketOptions.Options = &distribution.Distribution_BucketOptions_ExponentialBuckets{ ExponentialBuckets: &distribution.Distribution_BucketOptions_Exponential{ GrowthFactor: growth, Scale: scale, NumFiniteBuckets: int32(len(counts) - 2), }, } } var mean float64 if !math.IsNaN(float64(hist.Sum)) && hist.Count > 0 { // Avoid divide-by-zero mean = float64(hist.Sum) / float64(hist.Count) } return &distribution.Distribution{ Count: int64(hist.Count), Mean: mean, BucketCounts: counts, BucketOptions: bucketOptions, Exemplars: toDistributionExemplar[N](hist.Exemplars, projectID), } } func toDistributionExemplar[N int64 | float64](Exemplars []metricdata.Exemplar[N], projectID string) []*distribution.Distribution_Exemplar { var exemplars []*distribution.Distribution_Exemplar for _, e := range Exemplars { attachments := []*anypb.Any{} if hasValidSpanContext(e) { sctx, err := anypb.New(&monitoringpb.SpanContext{ SpanName: fmt.Sprintf("projects/%s/traces/%s/spans/%s", projectID, hex.EncodeToString(e.TraceID[:]), hex.EncodeToString(e.SpanID[:])), }) if err == nil { attachments = append(attachments, sctx) } } if len(e.FilteredAttributes) > 0 { attr, err := anypb.New(&monitoringpb.DroppedLabels{ Label: attributesToLabels(e.FilteredAttributes), }) if err == nil { attachments = append(attachments, attr) } } exemplars = append(exemplars, &distribution.Distribution_Exemplar{ Value: float64(e.Value), Timestamp: timestamppb.New(e.Time), Attachments: attachments, }) } sort.Slice(exemplars, func(i, j int) bool { return exemplars[i].Value < exemplars[j].Value }) return exemplars } func attributesToLabels(attrs []attribute.KeyValue) map[string]string { labels := make(map[string]string, len(attrs)) for _, attr := range attrs { labels[normalizeLabelKey(string(attr.Key))] = sanitizeUTF8(attr.Value.Emit()) } return labels } var ( nilTraceID trace.TraceID nilSpanID trace.SpanID ) func hasValidSpanContext[N int64 | float64](e metricdata.Exemplar[N]) bool { return !bytes.Equal(e.TraceID[:], nilTraceID[:]) && !bytes.Equal(e.SpanID[:], nilSpanID[:]) } func setSumOfSquaredDeviation[N int64 | float64](hist metricdata.HistogramDataPoint[N], dist *distribution.Distribution) { var prevBound float64 // Calculate the sum of squared deviation. for i := 0; i < len(hist.Bounds); i++ { // Assume all points in the bucket occur at the middle of the bucket range middleOfBucket := (prevBound + hist.Bounds[i]) / 2 dist.SumOfSquaredDeviation += float64(dist.BucketCounts[i]) * (middleOfBucket - dist.Mean) * (middleOfBucket - dist.Mean) prevBound = hist.Bounds[i] } // The infinity bucket is an implicit +Inf bound after the list of explicit bounds. // Assume points in the infinity bucket are at the top of the previous bucket middleOfInfBucket := prevBound if len(dist.BucketCounts) > 0 { dist.SumOfSquaredDeviation += float64(dist.BucketCounts[len(dist.BucketCounts)-1]) * (middleOfInfBucket - dist.Mean) * (middleOfInfBucket - dist.Mean) } } func numberDataPointToValue[N int64 | float64]( point metricdata.DataPoint[N], ) (*monitoringpb.TypedValue, googlemetricpb.MetricDescriptor_ValueType) { switch v := any(point.Value).(type) { case int64: return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ Int64Value: v, }}, googlemetricpb.MetricDescriptor_INT64 case float64: return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ DoubleValue: v, }}, googlemetricpb.MetricDescriptor_DOUBLE } // It is impossible to reach this statement return nil, googlemetricpb.MetricDescriptor_INT64 } // https://github.com/googleapis/googleapis/blob/c4c562f89acce603fb189679836712d08c7f8584/google/api/metric.proto#L149 // // > The label key name must follow: // > // > * Only upper and lower-case letters, digits and underscores (_) are // > allowed. // > * Label name must start with a letter or digit. // > * The maximum length of a label name is 100 characters. // // Note: this does not truncate if a label is too long. func normalizeLabelKey(s string) string { if len(s) == 0 { return s } s = strings.Map(sanitizeRune, s) if unicode.IsDigit(rune(s[0])) { s = "key_" + s } return s } // converts anything that is not a letter or digit to an underscore. func sanitizeRune(r rune) rune { if unicode.IsLetter(r) || unicode.IsDigit(r) { return r } // Everything else turns into an underscore return '_' }