exporter/collector/metrics.go (1,375 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains the rewritten googlecloud metrics exporter which no longer takes
// dependency on the OpenCensus stackdriver exporter.
package collector
import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"net/url"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/genproto/googleapis/api/distribution"
"google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
metricapi "go.opentelemetry.io/otel/metric"
"github.com/fsnotify/fsnotify"
"github.com/tidwall/wal"
"go.uber.org/zap"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/datapointstorage"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/normalization"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"
)
// self-observability reporting meters/tracers/loggers.
type selfObservability struct {
// Logger to use for this exporter.
log *zap.Logger
meterProvider metricapi.MeterProvider
}
// MetricsExporter is the GCM exporter that uses pdata directly.
type MetricsExporter struct {
mapper metricMapper
obs selfObservability
client monitoringClient
// self-observability metrics
pointsExportedCounter metricapi.Int64Counter
// Only used for testing purposes in lieu of initializing a fake client
exportFunc func(context.Context, *monitoringpb.CreateTimeSeriesRequest) error
// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest
// write ahead log handles exporter retries in-order to handle network outages
wal *exporterWAL
// mdCache tracks the metric descriptors that have already been sent to GCM
mdCache map[string]*monitoringpb.CreateMetricDescriptorRequest
// shutdownC is a channel for signaling a graceful shutdown
shutdownC chan struct{}
// requestOpts applies options to the context for requests, such as additional headers.
requestOpts []func(*context.Context, requestInfo)
cfg Config
// goroutines tracks the currently running child tasks
goroutines sync.WaitGroup
timeout time.Duration
}
type exporterWAL struct {
*wal.Log
// the full path of the WAL (user-configured directory + "gcp_metrics_wal")
path string
maxBackoff time.Duration
mutex sync.Mutex
}
// requestInfo is meant to abstract info from CreateMetricsDescriptorRequests and
// CreateTimeSeriesRequests that is shared by requestOpts functions.
type requestInfo struct {
projectName string
}
// metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has
// all pure functions.
type metricMapper struct {
normalizer normalization.Normalizer
obs selfObservability
exemplarAttachmentDropCount metricapi.Int64Counter
cfg Config
}
// Constants we use when translating summary metrics into GCP.
const (
SummaryCountPrefix = "_count"
SummarySumSuffix = "_sum"
scopeName = "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector"
)
const (
// The number of timeserieses to send to GCM in a single request. This
// is a hard limit in the GCM API, so we never want to exceed 200.
sendBatchSize = 200
// The default amount of time to retry a data point on network outage when
// WAL is enabled before discarding. Users can override by setting MetricConfig.WALConfig.MaxBackoff.
defaultWalMaxBackoff = time.Duration(3600 * time.Second)
)
const (
// The specific unit that needs to be present in an integer-valued metric so
// that it can be treated as a boolean.
specialIntToBoolUnit = "{gcp.BOOL}"
)
type labels map[string]string
// monitoringClient is the subset of monitoring.MetricClient this exporter uses,
// and allows us to mock the implementation for testing.
type monitoringClient interface {
CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
CreateServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
Close() error
CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error)
}
func (me *MetricsExporter) Shutdown(ctx context.Context) error {
// TODO: pass ctx to goroutines so that we can use its deadline
close(me.shutdownC)
c := make(chan struct{})
go func() {
// Wait until all goroutines are done
me.goroutines.Wait()
close(c)
}()
select {
case <-ctx.Done():
me.obs.log.Error("Error waiting for async tasks to finish.", zap.Error(ctx.Err()))
case <-c:
}
if me.client != nil {
return me.client.Close()
}
return nil
}
func NewGoogleCloudMetricsExporter(
ctx context.Context,
cfg Config,
set exporter.Settings,
timeout time.Duration,
) (*MetricsExporter, error) {
SetUserAgent(&cfg, set.BuildInfo)
meter := set.TelemetrySettings.MeterProvider.Meter(scopeName, metricapi.WithInstrumentationVersion(Version()))
pointsExportedCounter, err := meter.Int64Counter(
"googlecloudmonitoring/point_count",
metricapi.WithDescription("Count of metric points written to Cloud Monitoring."),
metricapi.WithUnit("1"),
)
if err != nil {
return nil, err
}
exemplarAttachmentDropCount, err := meter.Int64Counter(
"googlecloudmonitoring/exemplar_attachments_dropped",
metricapi.WithDescription("Count of exemplar attachments dropped."),
metricapi.WithUnit("{attachments}"),
)
if err != nil {
return nil, err
}
obs := selfObservability{
log: set.TelemetrySettings.Logger,
meterProvider: set.TelemetrySettings.MeterProvider,
}
normalizer := normalization.NewDisabledNormalizer()
mExp := &MetricsExporter{
cfg: cfg,
obs: obs,
mapper: metricMapper{
obs: obs,
cfg: cfg,
normalizer: normalizer,
exemplarAttachmentDropCount: exemplarAttachmentDropCount,
},
// We create a buffered channel for metric descriptors.
// MetricDescritpors are asychronously sent and optimistic.
// We only get Unit/Description/Display name from them, so it's ok
// to drop / conserve resources for sending timeseries.
metricDescriptorC: make(chan *monitoringpb.CreateMetricDescriptorRequest, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdCache: make(map[string]*monitoringpb.CreateMetricDescriptorRequest),
shutdownC: make(chan struct{}),
timeout: timeout,
pointsExportedCounter: pointsExportedCounter,
}
mExp.exportFunc = mExp.exportToTimeSeries
mExp.requestOpts = make([]func(*context.Context, requestInfo), 0)
if cfg.DestinationProjectQuota {
mExp.requestOpts = append(mExp.requestOpts, func(ctx *context.Context, ri requestInfo) {
*ctx = metadata.NewOutgoingContext(*ctx, metadata.New(map[string]string{"x-goog-user-project": strings.TrimPrefix(ri.projectName, "projects/")}))
})
}
return mExp, nil
}
func (me *MetricsExporter) Start(ctx context.Context, _ component.Host) error {
me.shutdownC = make(chan struct{})
if me.cfg.MetricConfig.CumulativeNormalization {
me.mapper.normalizer = normalization.NewStandardNormalizer(me.shutdownC, me.obs.log)
}
clientOpts, err := generateClientOptions(ctx, &me.cfg.MetricConfig.ClientConfig, &me.cfg, monitoring.DefaultAuthScopes(), me.obs.meterProvider)
if err != nil {
return err
}
client, err := monitoring.NewMetricClient(ctx, clientOpts...)
if err != nil {
return err
}
if me.cfg.MetricConfig.ClientConfig.Compression == gzip.Name {
client.CallOptions.CreateMetricDescriptor = append(client.CallOptions.CreateMetricDescriptor,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateTimeSeries = append(client.CallOptions.CreateTimeSeries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateServiceTimeSeries = append(client.CallOptions.CreateServiceTimeSeries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}
me.client = client
if me.cfg.MetricConfig.WALConfig != nil {
_, _, err = me.setupWAL()
if err != nil {
return err
}
// start WAL popper routine
me.goroutines.Add(1)
go me.runWALReadAndExportLoop(ctx)
}
// Fire up the metric descriptor exporter.
me.goroutines.Add(1)
go me.exportMetricDescriptorRunner()
return nil
}
// setupWAL creates the WAL.
// This function is also used to re-sync after writes, so it closes the existing WAL if present.
// It returns the FirstIndex, LastIndex, and any error.
func (me *MetricsExporter) setupWAL() (uint64, uint64, error) {
err := me.closeWAL()
if err != nil {
return 0, 0, err
}
if me.wal == nil {
me.wal = &exporterWAL{}
}
walPath := filepath.Join(me.cfg.MetricConfig.WALConfig.Directory, "gcp_metrics_wal")
me.wal.path = walPath
metricWal, err := wal.Open(walPath, &wal.Options{LogFormat: 1})
if err != nil {
return 0, 0, err
}
me.wal.Log = metricWal
// default to 1 hour exponential backoff
me.wal.maxBackoff = defaultWalMaxBackoff
if me.cfg.MetricConfig.WALConfig.MaxBackoff != 0 {
me.wal.maxBackoff = me.cfg.MetricConfig.WALConfig.MaxBackoff
}
// sync existing WAL indices
rIndex, err := me.wal.FirstIndex()
if err != nil {
return 0, 0, err
}
wIndex, err := me.wal.LastIndex()
if err != nil {
return 0, 0, err
}
return rIndex, wIndex, nil
}
func (me *MetricsExporter) closeWAL() error {
if me.wal != nil && me.wal.Log != nil {
err := me.wal.Log.Close()
me.wal.Log = nil
return err
}
return nil
}
// PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary.
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) error {
if me.client == nil {
return errors.New("not started")
}
if me.wal != nil {
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()
}
// map from project -> []timeseries. This groups timeseries by the project
// they need to be sent to. Each project's timeseries are sent in a
// separate request later.
pendingTimeSeries := map[string][]*monitoringpb.TimeSeries{}
// add extra metrics from the ExtraMetrics() extension point
if me.cfg.MetricConfig.ExtraMetrics != nil {
me.cfg.MetricConfig.ExtraMetrics(m)
}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
monitoredResource := me.cfg.MetricConfig.MapMonitoredResource(rm.Resource())
extraResourceLabels := attributesToLabels(filterAttributes(rm.Resource().Attributes(), me.cfg.MetricConfig.ServiceResourceLabels, me.cfg.MetricConfig.ResourceFilters))
projectID := me.cfg.ProjectID
// override project ID with gcp.project.id, if present
if projectFromResource, found := rm.Resource().Attributes().Get(resourcemapping.ProjectIDAttributeKey); found {
projectID = projectFromResource.AsString()
}
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
instrumentationScopeLabels := me.mapper.instrumentationScopeToLabels(sm.Scope())
metricLabels := mergeLabels(nil, instrumentationScopeLabels, extraResourceLabels)
mes := sm.Metrics()
for k := 0; k < mes.Len(); k++ {
metric := mes.At(k)
pendingTimeSeries[projectID] = append(pendingTimeSeries[projectID], me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric, projectID)...)
// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
continue
}
for _, md := range me.mapper.metricDescriptor(metric, metricLabels) {
if md == nil {
continue
}
req := &monitoringpb.CreateMetricDescriptorRequest{
Name: projectName(projectID),
MetricDescriptor: md,
}
select {
case me.metricDescriptorC <- req:
default:
// Ignore drops, we'll catch descriptor next time around.
}
}
}
}
}
var errs []error
// timeseries for each project are batched and exported separately
for projectID, projectTS := range pendingTimeSeries {
// Batch and export
for len(projectTS) > 0 {
var sendSize int
if len(projectTS) < sendBatchSize {
sendSize = len(projectTS)
} else {
sendSize = sendBatchSize
}
var ts []*monitoringpb.TimeSeries
ts, projectTS = projectTS[:sendSize], projectTS[sendSize:]
req := &monitoringpb.CreateTimeSeriesRequest{
Name: projectName(projectID),
TimeSeries: ts,
}
if me.wal != nil {
// push request onto the WAL
bytes, err := proto.Marshal(req)
if err != nil {
errs = append(errs, fmt.Errorf("failed to marshal protobuf to bytes: %+v", err))
continue
}
writeIndex, err := me.wal.LastIndex()
if err != nil {
errs = append(errs, fmt.Errorf("failed to get LastIndex of WAL: %+v", err))
continue
}
err = me.wal.Write(writeIndex+1, bytes)
if err != nil {
errs = append(errs, fmt.Errorf("failed to write to WAL: %+v", err))
continue
}
} else {
// otherwise export directly
errs = append(errs, me.export(ctx, req))
}
}
}
return errors.Join(errs...)
}
// exportToTimeSeries is the default exporting call to GCM.
// Broken into its own function for unit testing.
func (me *MetricsExporter) exportToTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error {
var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err = me.createServiceTimeSeries(ctx, req)
} else {
err = me.createTimeSeries(ctx, req)
}
return err
}
// export sends a CreateTimeSeriesRequest to GCM and reports failed/successful points based on the response.
func (me *MetricsExporter) export(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error {
// if this is an empty request, skip it
// empty requests are used by the WAL to signal the end of pending data
if isEmptyReq(req) {
return nil
}
err := me.exportFunc(ctx, req)
s := status.Convert(err)
succeededPoints := len(req.TimeSeries)
failedPoints := 0
for _, detail := range s.Details() {
if summary, ok := detail.(*monitoringpb.CreateTimeSeriesSummary); ok {
failedPoints = int(summary.TotalPointCount - summary.SuccessPointCount)
succeededPoints = int(summary.SuccessPointCount)
}
}
// always record the number of successful points
me.pointsExportedCounter.Add(
ctx,
int64(succeededPoints),
metricapi.WithAttributes(attribute.String("status", "OK")),
)
if failedPoints > 0 {
st := statusCodeToString(s)
me.pointsExportedCounter.Add(
ctx,
int64(failedPoints),
metricapi.WithAttributes(attribute.String("status", st)),
)
}
return err
}
func statusCodeToString(s *status.Status) string {
// see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
switch c := s.Code(); c {
case codes.OK:
return "OK"
case codes.Canceled:
return "CANCELLED"
case codes.Unknown:
return "UNKNOWN"
case codes.InvalidArgument:
return "INVALID_ARGUMENT"
case codes.DeadlineExceeded:
return "DEADLINE_EXCEEDED"
case codes.NotFound:
return "NOT_FOUND"
case codes.AlreadyExists:
return "ALREADY_EXISTS"
case codes.PermissionDenied:
return "PERMISSION_DENIED"
case codes.ResourceExhausted:
return "RESOURCE_EXHAUSTED"
case codes.FailedPrecondition:
return "FAILED_PRECONDITION"
case codes.Aborted:
return "ABORTED"
case codes.OutOfRange:
return "OUT_OF_RANGE"
case codes.Unimplemented:
return "UNIMPLEMENTED"
case codes.Internal:
return "INTERNAL"
case codes.Unavailable:
return "UNAVAILABLE"
case codes.DataLoss:
return "DATA_LOSS"
case codes.Unauthenticated:
return "UNAUTHENTICATED"
default:
return "CODE_" + strconv.FormatInt(int64(c), 10)
}
}
// readWALAndExport pops the next CreateTimeSeriesRequest from the WAL and tries exporting it.
// If the export is successful (or fails for a non-retryable error), the read index is incremented
// so the next entry in the WAL can be read by a subsequent call to readWALAndExport().
// If the export fails for a (retryable) network error, it will keep trying to export the same entry
// until success or the backoff max is reached.
func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()
// close and reopen the WAL to sync indices
readIndex, writeIndex, err := me.setupWAL()
if err != nil {
return err
}
bytes, err := me.wal.Read(readIndex)
if err == nil {
req := new(monitoringpb.CreateTimeSeriesRequest)
if err = proto.Unmarshal(bytes, req); err != nil {
return err
}
// on network failures, retry exponentially a max of 11 times (2^12s > 48 hours, older than allowed by GCM)
// or until user-configured max backoff is hit.
backoff := 0
for i := 0; i < 12; i++ {
err = me.export(ctx, req)
if err != nil {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
if isNotRecoverable(err) {
break
}
me.obs.log.Error("retryable error, retrying request")
backoff = 1 << i
if time.Duration(backoff)*time.Second >= me.wal.maxBackoff {
break
}
time.Sleep(time.Duration(backoff) * time.Second)
}
// If we are at the last index, and this last index is not an empty request
// (we use empty requests to fill out the end of a log, and if we didn't check for them
// this would loop constantly adding empty requests onto the end)
if readIndex == writeIndex && !isEmptyReq(req) {
// This indicates that we are trying to truncate the last item in the WAL.
// If that is the case, write an empty request so we can truncate the last real request
// (the WAL library requires at least 1 entry).
// Doing so prevents double-exporting in the event of a collector restart.
emptyReq := &monitoringpb.CreateTimeSeriesRequest{}
bytes, bytesErr := proto.Marshal(emptyReq)
if bytesErr != nil {
return bytesErr
}
writeIndex++
err = me.wal.Write(writeIndex, bytes)
if err != nil {
return err
}
}
// Truncate if readIndex < writeIndex.
// This only happens if there are more entries in the WAL
// OR, we are at the last real entry and added an "empty" entry above, in which we also increment writeIndex.
// otherwise, we've reached the end of the WAL and should be at an empty entry, which the export drops.
// If that's the case, and we try to truncate (ie, move readIndex+1), the library returns ErrOutOfRange.
if readIndex >= writeIndex {
// wal.ErrNotFound is used by wal.Read() to indicate the end of the WAL, but
// the wal library doesn't know about our hackery around empty entries.
// So it's used by us to indicate the same.
return wal.ErrNotFound
}
err = me.wal.TruncateFront(readIndex + 1)
if err != nil {
return err
}
}
return err
}
// watchWAL watches the WAL directory for a write then returns to the
// continuallyPopWAL() loop.
func (me *MetricsExporter) watchWALFile(ctx context.Context) error {
me.goroutines.Add(1)
defer me.goroutines.Done()
walWatcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
err = walWatcher.Add(me.wal.path)
if err != nil {
return err
}
watchCh := make(chan error)
var wErr error
go func() {
defer func() {
watchCh <- wErr
close(watchCh)
walWatcher.Close()
}()
select {
case <-me.shutdownC:
return
case <-ctx.Done():
wErr = ctx.Err()
return
case event, ok := <-walWatcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Remove:
wErr = fmt.Errorf("WAL file deleted")
case fsnotify.Rename:
wErr = fmt.Errorf("WAL file renamed")
case fsnotify.Write:
wErr = nil
}
case watchErr, ok := <-walWatcher.Errors:
if ok {
wErr = watchErr
}
}
}()
err = <-watchCh
return err
}
func (me *MetricsExporter) runWALReadAndExportLoop(ctx context.Context) {
defer me.goroutines.Done()
defer func() {
if err := me.wal.Close(); err != nil {
me.obs.log.Error(fmt.Sprintf("error closing WAL: %+v\n", err))
}
}()
for {
select {
case <-ctx.Done():
return
case <-me.shutdownC:
// do one last final read/export then return
// otherwise the runner goroutine could leave some hanging metrics unexported
for {
err := me.readWALAndExport(ctx)
if err != nil {
if !errors.Is(err, wal.ErrOutOfRange) {
me.obs.log.Error(fmt.Sprintf("error flushing remaining WAL entries: %+v", err))
}
break
}
}
return
default:
err := me.readWALAndExport(ctx)
if err == nil {
continue
}
// ErrNotFound from wal.Read() means the index is either 0 or out of
// bounds (indicating we're probably at the end of the WAL). That error
// will trigger a file watch for new writes (below this). For other
// errors, fail.
// ErrNotFound can be expected occasionally if we've reached the end of
// the WAL, so don't bother logging those.
if !errors.Is(err, wal.ErrNotFound) {
me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err))
}
// Must have been ErrNotFound, start a file watch and block waiting for updates.
if err = me.watchWALFile(ctx); err != nil {
me.obs.log.Error(fmt.Sprintf("error watching WAL and exporting: %+v", err))
}
}
}
}
// Reads metric descriptors from the md channel, and reports them (once) to GCM.
func (me *MetricsExporter) exportMetricDescriptorRunner() {
defer me.goroutines.Done()
// We iterate over all metric descritpors until the channel is closed.
// Note: if we get terminated, this will still attempt to export all descriptors
// prior to shutdown.
for {
select {
case <-me.shutdownC:
for {
// We are shutting down. Publish all the pending
// items on the channel before we stop.
select {
case md := <-me.metricDescriptorC:
me.exportMetricDescriptor(md)
default:
// Return and continue graceful shutdown.
return
}
}
case md := <-me.metricDescriptorC:
me.exportMetricDescriptor(md)
}
}
}
func projectName(projectID string) string {
return fmt.Sprintf("projects/%s", projectID)
}
// isNotRecoverable returns true if the error is permanent.
func isNotRecoverable(err error) bool {
s := status.Convert(err)
return !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable)
}
// Helper method to send metric descriptors to GCM.
func (me *MetricsExporter) exportMetricDescriptor(req *monitoringpb.CreateMetricDescriptorRequest) {
cacheKey := fmt.Sprintf("%s/%s", req.Name, req.MetricDescriptor.Type)
if _, exists := me.mdCache[cacheKey]; exists {
return
}
ctx, cancel := context.WithTimeout(context.Background(), me.timeout)
defer cancel()
for _, opt := range me.requestOpts {
opt(&ctx, requestInfo{projectName: req.Name})
}
_, err := me.client.CreateMetricDescriptor(ctx, req)
if err != nil {
if isNotRecoverable(err) {
// cache if the error is non-recoverable
me.mdCache[cacheKey] = req
}
// TODO: Log-once on error, per metric descriptor?
me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", req.MetricDescriptor))
return
}
// cache if we are successful
me.mdCache[cacheKey] = req
}
// Sends a user-custom-metric timeseries.
func (me *MetricsExporter) createTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error {
ctx, cancel := context.WithTimeout(ctx, me.timeout)
defer cancel()
for _, opt := range me.requestOpts {
opt(&ctx, requestInfo{projectName: req.Name})
}
return me.client.CreateTimeSeries(ctx, req)
}
// Sends a service timeseries.
func (me *MetricsExporter) createServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error {
ctx, cancel := context.WithTimeout(ctx, me.timeout)
defer cancel()
for _, opt := range me.requestOpts {
opt(&ctx, requestInfo{projectName: req.Name})
}
return me.client.CreateServiceTimeSeries(ctx, req)
}
func (m *metricMapper) instrumentationScopeToLabels(is pcommon.InstrumentationScope) labels {
isLabels := make(labels)
if !m.cfg.MetricConfig.InstrumentationLibraryLabels {
return isLabels
}
instrumentationSource := sanitizeUTF8(is.Name())
if len(instrumentationSource) > 0 {
isLabels["instrumentation_source"] = instrumentationSource
}
instrumentationVersion := sanitizeUTF8(is.Version())
if len(instrumentationVersion) > 0 {
isLabels["instrumentation_version"] = instrumentationVersion
}
return isLabels
}
func (m *metricMapper) metricToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
projectID string,
) []*monitoringpb.TimeSeries {
timeSeries := []*monitoringpb.TimeSeries{}
switch metric.Type() {
case pmetric.MetricTypeSum:
sum := metric.Sum()
points := sum.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.sumPointToTimeSeries(resource, extraLabels, metric, sum, points.At(i))
timeSeries = append(timeSeries, ts...)
}
case pmetric.MetricTypeGauge:
gauge := metric.Gauge()
points := gauge.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.gaugePointToTimeSeries(resource, extraLabels, metric, gauge, points.At(i))
timeSeries = append(timeSeries, ts...)
}
case pmetric.MetricTypeSummary:
summary := metric.Summary()
points := summary.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.summaryPointToTimeSeries(resource, extraLabels, metric, summary, points.At(i))
timeSeries = append(timeSeries, ts...)
}
case pmetric.MetricTypeHistogram:
hist := metric.Histogram()
points := hist.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.histogramToTimeSeries(resource, extraLabels, metric, hist, points.At(i), projectID)
timeSeries = append(timeSeries, ts...)
}
case pmetric.MetricTypeExponentialHistogram:
eh := metric.ExponentialHistogram()
points := eh.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.exponentialHistogramToTimeSeries(resource, extraLabels, metric, eh, points.At(i), projectID)
timeSeries = append(timeSeries, ts...)
}
default:
m.obs.log.Error("Unsupported metric data type", zap.Any("data_type", metric.Type()))
}
return timeSeries
}
func (m *metricMapper) summaryPointToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
_ pmetric.Summary,
point pmetric.SummaryDataPoint,
) []*monitoringpb.TimeSeries {
if point.Flags().NoRecordedValue() {
// Drop points without a value.
return nil
}
// Normalize the summary point.
metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes())
keep := m.normalizer.NormalizeSummaryDataPoint(point, metricIdentifier)
if !keep {
return nil
}
sumType, countType, quantileType, err := m.summaryMetricTypes(metric)
if err != nil {
m.obs.log.Debug("Failed to get metric type (i.e. name) for summary metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric))
return nil
}
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
result := []*monitoringpb.TimeSeries{
{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: endTime,
},
Value: &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: point.Sum(),
}},
}},
Metric: &metricpb.Metric{
Type: sumType,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
},
{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: endTime,
},
Value: &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: float64(point.Count()),
}},
}},
Metric: &metricpb.Metric{
Type: countType,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
},
}
quantiles := point.QuantileValues()
for i := 0; i < quantiles.Len(); i++ {
quantile := quantiles.At(i)
pLabel := labels{
"quantile": strconv.FormatFloat(quantile.Quantile(), 'f', -1, 64),
}
result = append(result, &monitoringpb.TimeSeries{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricpb.MetricDescriptor_GAUGE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
EndTime: endTime,
},
Value: &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: quantile.Value(),
}},
}},
Metric: &metricpb.Metric{
Type: quantileType,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
pLabel,
),
},
})
}
return result
}
func (m *metricMapper) exemplar(ex pmetric.Exemplar, projectID string) *distribution.Distribution_Exemplar {
ctx := context.TODO()
attachments := []*anypb.Any{}
// TODO: Look into still sending exemplars with no span.
if traceID, spanID := ex.TraceID(), ex.SpanID(); !traceID.IsEmpty() && !spanID.IsEmpty() {
sctx, err := anypb.New(&monitoringpb.SpanContext{
// TODO - make sure project id is correct.
SpanName: fmt.Sprintf("projects/%s/traces/%s/spans/%s", projectID, hex.EncodeToString(traceID[:]), hex.EncodeToString(spanID[:])),
})
if err == nil {
attachments = append(attachments, sctx)
} else {
// This happens in the event of logic error (e.g. missing required fields).
// As such we complaining loudly to fail our unit tests.
m.exemplarAttachmentDropCount.Add(ctx, 1)
}
}
if ex.FilteredAttributes().Len() > 0 {
attr, err := anypb.New(&monitoringpb.DroppedLabels{
Label: attributesToLabels(ex.FilteredAttributes()),
})
if err == nil {
attachments = append(attachments, attr)
} else {
// This happens in the event of logic error (e.g. missing required fields).
// As such we complaining loudly to fail our unit tests.
m.exemplarAttachmentDropCount.Add(ctx, 1)
}
}
var val float64
switch ex.ValueType() {
case pmetric.ExemplarValueTypeDouble:
val = ex.DoubleValue()
case pmetric.ExemplarValueTypeInt:
val = float64(ex.IntValue())
}
return &distribution.Distribution_Exemplar{
Value: val,
Timestamp: timestamppb.New(ex.Timestamp().AsTime()),
Attachments: attachments,
}
}
func (m *metricMapper) exemplars(exs pmetric.ExemplarSlice, projectID string) []*distribution.Distribution_Exemplar {
exemplars := make([]*distribution.Distribution_Exemplar, exs.Len())
for i := 0; i < exs.Len(); i++ {
exemplars[i] = m.exemplar(exs.At(i), projectID)
}
sort.Slice(exemplars, func(i, j int) bool {
return exemplars[i].Value < exemplars[j].Value
})
return exemplars
}
// histogramPoint maps a histogram data point into a GCM point.
func (m *metricMapper) histogramPoint(point pmetric.HistogramDataPoint, projectID string) *monitoringpb.TypedValue {
counts := make([]int64, point.BucketCounts().Len())
var mean, deviation, prevBound float64
for i := 0; i < point.BucketCounts().Len(); i++ {
counts[i] = int64(point.BucketCounts().At(i))
}
if !math.IsNaN(point.Sum()) && point.Count() > 0 { // Avoid divide-by-zero
mean = float64(point.Sum() / float64(point.Count()))
}
bounds := point.ExplicitBounds()
if m.cfg.MetricConfig.EnableSumOfSquaredDeviation {
// Calculate the sum of squared deviation.
for i := 0; i < bounds.Len(); i++ {
// Assume all points in the bucket occur at the middle of the bucket range
middleOfBucket := (prevBound + bounds.At(i)) / 2
deviation += float64(counts[i]) * (middleOfBucket - mean) * (middleOfBucket - mean)
prevBound = bounds.At(i)
}
// The infinity bucket is an implicit +Inf bound after the list of explicit bounds.
// Assume points in the infinity bucket are at the top of the previous bucket
middleOfInfBucket := prevBound
deviation += float64(counts[len(counts)-1]) * (middleOfInfBucket - mean) * (middleOfInfBucket - mean)
}
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distribution.Distribution{
Count: int64(point.Count()),
Mean: mean,
BucketCounts: counts,
SumOfSquaredDeviation: deviation,
BucketOptions: &distribution.Distribution_BucketOptions{
Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{
Bounds: bounds.AsRaw(),
},
},
},
Exemplars: m.exemplars(point.Exemplars(), projectID),
},
},
}
}
// Maps an exponential distribution into a GCM point.
func (m *metricMapper) exponentialHistogramPoint(point pmetric.ExponentialHistogramDataPoint, projectID string) *monitoringpb.TypedValue {
// First calculate underflow bucket with all negatives + zeros.
underflow := point.ZeroCount()
negativeBuckets := point.Negative().BucketCounts()
for i := 0; i < negativeBuckets.Len(); i++ {
underflow += negativeBuckets.At(i)
}
// Next, pull in remaining buckets.
counts := make([]int64, point.Positive().BucketCounts().Len()+2)
bucketOptions := &distribution.Distribution_BucketOptions{}
counts[0] = int64(underflow)
positiveBuckets := point.Positive().BucketCounts()
for i := 0; i < positiveBuckets.Len(); i++ {
counts[i+1] = int64(positiveBuckets.At(i))
}
// Overflow bucket is always empty
counts[len(counts)-1] = 0
if point.Positive().BucketCounts().Len() == 0 {
// We cannot send exponential distributions with no positive buckets,
// instead we send a simple overflow/underflow histogram.
bucketOptions.Options = &distribution.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{
Bounds: []float64{0},
},
}
} else {
// Exponential histogram
growth := math.Exp2(math.Exp2(-float64(point.Scale())))
scale := math.Pow(growth, float64(point.Positive().Offset()))
bucketOptions.Options = &distribution.Distribution_BucketOptions_ExponentialBuckets{
ExponentialBuckets: &distribution.Distribution_BucketOptions_Exponential{
GrowthFactor: growth,
Scale: scale,
NumFiniteBuckets: int32(len(counts) - 2),
},
}
}
mean := float64(0)
if !math.IsNaN(point.Sum()) && point.Count() > 0 { // Avoid divide-by-zero
mean = float64(point.Sum() / float64(point.Count()))
}
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distribution.Distribution{
Count: int64(point.Count()),
Mean: mean,
BucketCounts: counts,
BucketOptions: bucketOptions,
Exemplars: m.exemplars(point.Exemplars(), projectID),
},
},
}
}
func (m *metricMapper) histogramToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
hist pmetric.Histogram,
point pmetric.HistogramDataPoint,
projectID string,
) []*monitoringpb.TimeSeries {
if point.Flags().NoRecordedValue() || !point.HasSum() || point.ExplicitBounds().Len() == 0 {
// Drop points without a value or without a sum
m.obs.log.Debug("Metric has no value, sum, or explicit bounds. Dropping the metric.", zap.Any("metric", metric))
return nil
}
t, err := m.metricNameToType(metric.Name(), metric)
if err != nil {
m.obs.log.Debug("Failed to get metric type (i.e. name) for histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric))
return nil
}
if hist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
// Normalize cumulative histogram points.
metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes())
keep := m.normalizer.NormalizeHistogramDataPoint(point, metricIdentifier)
if !keep {
return nil
}
}
// We treat deltas as cumulatives w/ resets.
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.histogramPoint(point, projectID)
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
ValueType: metricpb.MetricDescriptor_DISTRIBUTION,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: endTime,
},
Value: value,
}},
Metric: &metricpb.Metric{
Type: t,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
}}
}
func (m *metricMapper) exponentialHistogramToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
exponentialHist pmetric.ExponentialHistogram,
point pmetric.ExponentialHistogramDataPoint,
projectID string,
) []*monitoringpb.TimeSeries {
if point.Flags().NoRecordedValue() {
// Drop points without a value.
return nil
}
t, err := m.metricNameToType(metric.Name(), metric)
if err != nil {
m.obs.log.Debug("Failed to get metric type (i.e. name) for exponential histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric))
return nil
}
if exponentialHist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
// Normalize the histogram point.
metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes())
keep := m.normalizer.NormalizeExponentialHistogramDataPoint(point, metricIdentifier)
if !keep {
return nil
}
}
// We treat deltas as cumulatives w/ resets.
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.exponentialHistogramPoint(point, projectID)
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
ValueType: metricpb.MetricDescriptor_DISTRIBUTION,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: endTime,
},
Value: value,
}},
Metric: &metricpb.Metric{
Type: t,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
}}
}
func (m *metricMapper) sumPointToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
sum pmetric.Sum,
point pmetric.NumberDataPoint,
) []*monitoringpb.TimeSeries {
metricKind := metricpb.MetricDescriptor_CUMULATIVE
var startTime *timestamppb.Timestamp
if point.Flags().NoRecordedValue() {
// Drop points without a value. This may be a staleness marker from
// prometheus.
return nil
}
t, err := m.metricNameToType(metric.Name(), metric)
if err != nil {
m.obs.log.Debug("Failed to get metric type (i.e. name) for sum metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric))
return nil
}
if sum.IsMonotonic() {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes())
keep := m.normalizer.NormalizeNumberDataPoint(point, metricIdentifier)
if !keep {
return nil
}
}
startTime = timestamppb.New(point.StartTimestamp().AsTime())
} else {
metricKind = metricpb.MetricDescriptor_GAUGE
startTime = nil
}
value, valueType := m.numberDataPointToValue(point, metricKind, metric.Unit())
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
ValueType: valueType,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: timestamppb.New(point.Timestamp().AsTime()),
},
Value: value,
}},
Metric: &metricpb.Metric{
Type: t,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
}}
}
func (m *metricMapper) gaugePointToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pmetric.Metric,
_ pmetric.Gauge,
point pmetric.NumberDataPoint,
) []*monitoringpb.TimeSeries {
if point.Flags().NoRecordedValue() {
// Drop points without a value.
return nil
}
t, err := m.metricNameToType(metric.Name(), metric)
if err != nil {
m.obs.log.Debug("Unable to get metric type (i.e. name) for gauge metric.", zap.Error(err), zap.Any("metric", metric))
return nil
}
metricKind := metricpb.MetricDescriptor_GAUGE
value, valueType := m.numberDataPointToValue(point, metricKind, metric.Unit())
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
ValueType: valueType,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
EndTime: timestamppb.New(point.Timestamp().AsTime()),
},
Value: value,
}},
Metric: &metricpb.Metric{
Type: t,
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
}}
}
// Returns any configured prefix to add to unknown metric name.
func (m *metricMapper) getMetricNamePrefix(name string) string {
for _, domain := range m.cfg.MetricConfig.KnownDomains {
if strings.Contains(name, domain) {
return ""
}
}
return m.cfg.MetricConfig.Prefix
}
// metricNameToType maps OTLP metric name to GCM metric type (aka name).
func (m *metricMapper) metricNameToType(name string, metric pmetric.Metric) (string, error) {
metricName, err := m.cfg.MetricConfig.GetMetricName(name, metric)
if err != nil {
return "", err
}
return path.Join(m.getMetricNamePrefix(metricName), metricName), nil
}
// defaultGetMetricName does not (further) customize the baseName.
func defaultGetMetricName(baseName string, _ pmetric.Metric) (string, error) {
return baseName, nil
}
// this function converts the pdata metric to cloud monitoring.
func (m *metricMapper) numberDataPointToValue(
point pmetric.NumberDataPoint,
metricKind metricpb.MetricDescriptor_MetricKind,
metricUnit string,
) (*monitoringpb.TypedValue, metricpb.MetricDescriptor_ValueType) {
supportedTypedValue, supportedValueType := m.convertToBoolIfMetricKindSupported(point, metricKind, metricUnit)
if supportedValueType != metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED {
return supportedTypedValue, supportedValueType
}
if point.ValueType() == pmetric.NumberDataPointValueTypeInt {
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: point.IntValue(),
}},
metricpb.MetricDescriptor_INT64
}
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: point.DoubleValue(),
}},
metricpb.MetricDescriptor_DOUBLE
}
// This function maps the values of the metric to certain types which are supported in Google Cloud Monitoring, but not in OTEL.
// Supported types includes BOOL. The conversion only happens for metric kind GAUGE and only if the conversion intent is indicated via the unit.
// The function returns the converted value and type if conditions are met, otherwise a nil value with value type MetricDescriptor_VALUE_TYPE_UNSPECIFIED is returned - indicating
// unsupported type or failure to meet constraints for conversion.
func (me *metricMapper) convertToBoolIfMetricKindSupported(
point pmetric.NumberDataPoint,
metricKind metricpb.MetricDescriptor_MetricKind,
metricUnit string,
) (*monitoringpb.TypedValue, metricpb.MetricDescriptor_ValueType) {
if metricUnit == specialIntToBoolUnit {
if metricKind == metricpb.MetricDescriptor_GAUGE && point.ValueType() == pmetric.NumberDataPointValueTypeInt {
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_BoolValue{
BoolValue: point.IntValue() != 0,
}},
metricpb.MetricDescriptor_BOOL
}
me.obs.log.Warn("Failed to interpret metric as BOOL. Attempted conversion on BOOL metrics are only supported on integer valued gauges", zap.Any("metric_kind", metricKind), zap.Any("value_type", point.ValueType()))
}
return nil, metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
}
func attributesToLabels(attrs pcommon.Map) labels {
ls := make(labels, attrs.Len())
attrs.Range(func(k string, v pcommon.Value) bool {
ls[sanitizeKey(k)] = sanitizeUTF8(v.AsString())
return true
})
return ls
}
func sanitizeUTF8(s string) string {
return strings.ToValidUTF8(s, "�")
}
// Replaces non-alphanumeric characters to underscores. Note, this does not truncate label keys
// longer than 100 characters or prepend "key" when the first character is "_" like OpenCensus
// did.
func sanitizeKey(s string) string {
if len(s) == 0 {
return s
}
s = strings.Map(sanitizeRune, s)
if unicode.IsDigit(rune(s[0])) {
s = "key_" + s
}
return s
}
// converts anything that is not a letter or digit to an underscore.
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
}
func mergeLabels(mergeInto labels, others ...labels) labels {
if mergeInto == nil {
mergeInto = labels{}
}
for _, ls := range others {
for k, v := range ls {
mergeInto[k] = v
}
}
return mergeInto
}
// Takes a GCM metric type, like (workload.googleapis.com/MyCoolMetric) and returns the display name.
func (m *metricMapper) metricTypeToDisplayName(mURL string) string {
// TODO - user configuration around display name?
// Default: strip domain, keep path after domain.
u, err := url.Parse(fmt.Sprintf("metrics://%s", mURL))
if err != nil || u.Path == "" {
return mURL
}
return strings.TrimLeft(u.Path, "/")
}
// Returns label descriptors for a metric.
func (m *metricMapper) labelDescriptors(
pm pmetric.Metric,
extraLabels labels,
) []*label.LabelDescriptor {
// TODO - allow customization of label descriptions.
result := []*label.LabelDescriptor{}
for key := range extraLabels {
result = append(result, &label.LabelDescriptor{
Key: sanitizeKey(key),
})
}
seenKeys := map[string]struct{}{}
addAttributes := func(attr pcommon.Map) {
attr.Range(func(key string, _ pcommon.Value) bool {
// Skip keys that have already been set
if _, ok := seenKeys[sanitizeKey(key)]; ok {
return true
}
result = append(result, &label.LabelDescriptor{
Key: sanitizeKey(key),
})
seenKeys[sanitizeKey(key)] = struct{}{}
return true
})
}
switch pm.Type() {
case pmetric.MetricTypeGauge:
points := pm.Gauge().DataPoints()
for i := 0; i < points.Len(); i++ {
addAttributes(points.At(i).Attributes())
}
case pmetric.MetricTypeSum:
points := pm.Sum().DataPoints()
for i := 0; i < points.Len(); i++ {
addAttributes(points.At(i).Attributes())
}
case pmetric.MetricTypeSummary:
points := pm.Summary().DataPoints()
for i := 0; i < points.Len(); i++ {
addAttributes(points.At(i).Attributes())
}
case pmetric.MetricTypeHistogram:
points := pm.Histogram().DataPoints()
for i := 0; i < points.Len(); i++ {
addAttributes(points.At(i).Attributes())
}
case pmetric.MetricTypeExponentialHistogram:
points := pm.ExponentialHistogram().DataPoints()
for i := 0; i < points.Len(); i++ {
addAttributes(points.At(i).Attributes())
}
}
return result
}
// Returns (sum, count, quantile) metric types (i.e. names) for a summary metric.
func (m *metricMapper) summaryMetricTypes(pm pmetric.Metric) (string, string, string, error) {
sumType, err := m.metricNameToType(pm.Name()+SummarySumSuffix, pm)
if err != nil {
return "", "", "", err
}
countType, err := m.metricNameToType(pm.Name()+SummaryCountPrefix, pm)
if err != nil {
return "", "", "", err
}
quantileType, err := m.metricNameToType(pm.Name(), pm)
if err != nil {
return "", "", "", err
}
return sumType, countType, quantileType, nil
}
func (m *metricMapper) summaryMetricDescriptors(
pm pmetric.Metric,
extraLabels labels,
) []*metricpb.MetricDescriptor {
sumType, countType, quantileType, err := m.summaryMetricTypes(pm)
if err != nil {
m.obs.log.Debug("Failed to get metric types (i.e. names) for summary metric. Dropping the metric.", zap.Error(err), zap.Any("metric", pm))
return nil
}
labels := m.labelDescriptors(pm, extraLabels)
return []*metricpb.MetricDescriptor{
{
Type: sumType,
Labels: labels,
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Unit: pm.Unit(),
Description: pm.Description(),
DisplayName: pm.Name() + SummarySumSuffix,
},
{
Type: countType,
Labels: labels,
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Unit: pm.Unit(),
Description: pm.Description(),
DisplayName: pm.Name() + SummaryCountPrefix,
},
{
Type: quantileType,
Labels: append(
labels,
&label.LabelDescriptor{
Key: "quantile",
Description: "the value at a given quantile of a distribution",
}),
MetricKind: metricpb.MetricDescriptor_GAUGE,
ValueType: metricpb.MetricDescriptor_DOUBLE,
Unit: pm.Unit(),
Description: pm.Description(),
DisplayName: pm.Name(),
},
}
}
// Extract the metric descriptor from a metric data point.
func (m *metricMapper) metricDescriptor(
pm pmetric.Metric,
extraLabels labels,
) []*metricpb.MetricDescriptor {
if pm.Type() == pmetric.MetricTypeSummary {
return m.summaryMetricDescriptors(pm, extraLabels)
}
kind, typ := m.mapMetricPointKind(pm)
if kind == metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED {
m.obs.log.Debug("Failed to get metric kind (i.e. aggregation) for metric descriptor. Dropping the metric descriptor.", zap.Any("metric", pm))
return nil
}
if typ == metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED {
m.obs.log.Debug("Failed to get metric type (int / double) for metric descriptor. Dropping the metric descriptor.", zap.Any("metric", pm))
return nil
}
metricType, err := m.metricNameToType(pm.Name(), pm)
if err != nil {
m.obs.log.Debug("Failed to get metric type (i.e. name) for metric descriptor. Dropping the metric descriptor.", zap.Error(err), zap.Any("metric", pm))
return nil
}
labels := m.labelDescriptors(pm, extraLabels)
// Return nil for unsupported types.
if kind == metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED {
return nil
}
return []*metricpb.MetricDescriptor{
{
Name: pm.Name(),
DisplayName: m.metricTypeToDisplayName(metricType),
Type: metricType,
MetricKind: kind,
ValueType: typ,
Unit: pm.Unit(),
Description: pm.Description(),
Labels: labels,
},
}
}
func metricPointValueType(pt pmetric.NumberDataPointValueType) metricpb.MetricDescriptor_ValueType {
switch pt {
case pmetric.NumberDataPointValueTypeInt:
return metricpb.MetricDescriptor_INT64
case pmetric.NumberDataPointValueTypeDouble:
return metricpb.MetricDescriptor_DOUBLE
default:
return metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
}
}
func isEmptyReq(req *monitoringpb.CreateTimeSeriesRequest) bool {
return (req == nil || (req.Name == "" && req.TimeSeries == nil))
}
func (me *metricMapper) mapMetricPointKind(m pmetric.Metric) (metricpb.MetricDescriptor_MetricKind, metricpb.MetricDescriptor_ValueType) {
var kind metricpb.MetricDescriptor_MetricKind
var typ metricpb.MetricDescriptor_ValueType
switch m.Type() {
case pmetric.MetricTypeGauge:
kind = metricpb.MetricDescriptor_GAUGE
if m.Gauge().DataPoints().Len() > 0 {
_, supportedType := me.convertToBoolIfMetricKindSupported(m.Gauge().DataPoints().At(0), kind, m.Unit())
if supportedType != metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED {
typ = supportedType
} else {
typ = metricPointValueType(m.Gauge().DataPoints().At(0).ValueType())
}
}
case pmetric.MetricTypeSum:
if !m.Sum().IsMonotonic() {
kind = metricpb.MetricDescriptor_GAUGE
} else {
kind = metricpb.MetricDescriptor_CUMULATIVE
}
if m.Sum().DataPoints().Len() > 0 {
typ = metricPointValueType(m.Sum().DataPoints().At(0).ValueType())
}
case pmetric.MetricTypeSummary:
kind = metricpb.MetricDescriptor_GAUGE
case pmetric.MetricTypeHistogram:
typ = metricpb.MetricDescriptor_DISTRIBUTION
kind = metricpb.MetricDescriptor_CUMULATIVE
case pmetric.MetricTypeExponentialHistogram:
typ = metricpb.MetricDescriptor_DISTRIBUTION
kind = metricpb.MetricDescriptor_CUMULATIVE
default:
kind = metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED
typ = metricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
}
return kind, typ
}