internal/tel/v2/tel.go (247 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package tel provides telemetry into the connector's internal operations. package tel import ( "context" "strings" "time" "cloud.google.com/go/alloydbconn/debug" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/api/option" cmexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" ) const ( meterName = "alloydb.googleapis.com/client/connector" monitoredResource = "alloydb.googleapis.com/InstanceClient" dialCount = "dial_count" dialLatency = "dial_latencies" openConnections = "open_connections" bytesSent = "bytes_sent_count" bytesReceived = "bytes_received_count" refreshCount = "refresh_count" // ProjectID specifies the instance's parent project. ProjectID = "project_id" // Location specifies the instances region (aka location). Location = "location" // Cluster specifies the cluster name. Cluster = "cluster_id" // Instance specifies the instance name. Instance = "instance_id" // ClientID is a unique ID specifying the instance of the // alloydbconn.Dialer. ClientID = "client_uid" // connectorType is one of go or auth-proxy connectorType = "connector_type" // authType is one of iam or built-in authType = "auth_type" // isCacheHit reports whether connection info was available in the cache isCacheHit = "is_cache_hit" // status indicates whether the dial attempt succeeded or not. status = "status" // refreshType indicates whether the cache is a refresh ahead cache or a // lazy cache. refreshType = "refresh_type" // DialSuccess indicates the dial attempt succeeded. DialSuccess = "success" // DialUserError indicates the dial attempt failed due to a user mistake. DialUserError = "user_error" // DialCacheError indicates the dialer failed to retrieved the cached // connection info. DialCacheError = "cache_error" // DialTCPError indicates a TCP-level error. DialTCPError = "tcp_error" // DialTLSError indicates an error with the TLS connection. DialTLSError = "tls_error" // DialMDXError indicates an error with the metadata exchange. DialMDXError = "mdx_error" // RefreshSuccess indicates the refresh operation to retrieve new // connection info succeeded. RefreshSuccess = "success" // RefreshFailure indicates the refresh operation failed. RefreshFailure = "failure" // RefreshAheadType indicates the dialer is using a refresh ahead cache. RefreshAheadType = "refresh_ahead" // RefreshLazyType indicates the dialer is using a lazy cache. RefreshLazyType = "lazy" ) // Config holds all the necessary information to configure a MetricRecorder. type Config struct { // Enabled specifies whether the metrics should be enabled. Enabled bool // Version is the version of the alloydbconn.Dialer. Version string // ClientID uniquely identifies the instance of the alloydbconn.Dialer. ClientID string // ProjectID is the project ID of the AlloyDB instance. ProjectID string // LocationAlloyDBs the location of the AlloyDB instance. Location string // Cluster is the name of the AlloyDB cluster. Cluster string // Instance is the name of the AlloyDB instance. Instance string } // MetricRecorder defines the interface for recording metrics related to the // internal operations of alloydbconn.Dialer. type MetricRecorder interface { Shutdown(context.Context) error RecordBytesRxCount(context.Context, int64, Attributes) RecordBytesTxCount(context.Context, int64, Attributes) RecordDialCount(context.Context, Attributes) RecordDialLatency(context.Context, int64, Attributes) RecordOpenConnection(context.Context, Attributes) RecordClosedConnection(context.Context, Attributes) RecordRefreshCount(context.Context, Attributes) } // DefaultExportInterval is the interval that the metric exporter runs. It // should always be 60s. This value is exposed as a var to faciliate testing. var DefaultExportInterval = 60 * time.Second // NewMetricRecorder creates a MetricRecorder. When the configuration is not // enabled, a null recorder is returned instead. func NewMetricRecorder(ctx context.Context, l debug.ContextLogger, cfg Config, opts ...option.ClientOption) MetricRecorder { if !cfg.Enabled { l.Debugf(ctx, "disabling built-in metrics") return NullMetricRecorder{} } eopts := []cmexporter.Option{ cmexporter.WithCreateServiceTimeSeries(), cmexporter.WithProjectID(cfg.ProjectID), cmexporter.WithMonitoringClientOptions(opts...), cmexporter.WithMetricDescriptorTypeFormatter(func(m metricdata.Metrics) string { return "alloydb.googleapis.com/client/connector/" + m.Name }), cmexporter.WithMonitoredResourceDescription(monitoredResource, []string{ ProjectID, Location, Cluster, Instance, ClientID, }), } exp, err := cmexporter.New(eopts...) if err != nil { l.Debugf(ctx, "built-in metrics exporter failed to initialize: %v", err) return NullMetricRecorder{} } res := resource.NewWithAttributes(monitoredResource, // The gcp.resource_type is a special attribute that the exporter // transforms into the MonitoredResource field. attribute.String("gcp.resource_type", monitoredResource), attribute.String(ProjectID, cfg.ProjectID), attribute.String(Location, cfg.Location), attribute.String(Cluster, cfg.Cluster), attribute.String(Instance, cfg.Instance), attribute.String(ClientID, cfg.ClientID), ) p := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader( exp, // The periodic reader runs every 60 seconds by default, but set // the value anyway to be defensive. sdkmetric.WithInterval(DefaultExportInterval), )), sdkmetric.WithResource(res), ) m := p.Meter(meterName, metric.WithInstrumentationVersion(cfg.Version)) mDialCount, err := m.Int64Counter(dialCount) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize dial count metric: %v", err) return NullMetricRecorder{} } mDialLatency, err := m.Float64Histogram(dialLatency) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize dial latency metric: %v", err) return NullMetricRecorder{} } mOpenConns, err := m.Int64UpDownCounter(openConnections) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize open connections metric: %v", err) return NullMetricRecorder{} } mBytesTx, err := m.Int64Counter(bytesSent) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize bytes sent metric: %v", err) return NullMetricRecorder{} } mBytesRx, err := m.Int64Counter(bytesReceived) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize bytes received metric: %v", err) return NullMetricRecorder{} } mRefreshCount, err := m.Int64Counter(refreshCount) if err != nil { _ = exp.Shutdown(ctx) l.Debugf(ctx, "built-in metrics exporter failed to initialize refresh count metric: %v", err) return NullMetricRecorder{} } return &metricRecorder{ exporter: exp, provider: p, dialerID: cfg.ClientID, mDialCount: mDialCount, mDialLatency: mDialLatency, mOpenConns: mOpenConns, mBytesTx: mBytesTx, mBytesRx: mBytesRx, mRefreshCount: mRefreshCount, } } // metricRecorder holds the various counters that track internal operations. type metricRecorder struct { exporter sdkmetric.Exporter provider *sdkmetric.MeterProvider dialerID string mDialCount metric.Int64Counter mDialLatency metric.Float64Histogram mOpenConns metric.Int64UpDownCounter mBytesTx metric.Int64Counter mBytesRx metric.Int64Counter mRefreshCount metric.Int64Counter } // Shutdown should be called when the MetricRecorder is no longer needed. func (m *metricRecorder) Shutdown(ctx context.Context) error { // Shutdown only the provider. The provider will shutdown the exporter as // part of its own shutdown, i.e., provider shuts down the reader, the // reader shuts down the exporter. So one shutdown call here is enough. return m.provider.Shutdown(ctx) } func connectorTypeValue(userAgent string) string { if strings.Contains(userAgent, "auth-proxy") { return "auth_proxy" } return "go" } func authTypeValue(iamAuthn bool) string { if iamAuthn { return "iam" } return "built_in" } // Attributes holds all the various pieces of metadata to attach to a metric. type Attributes struct { // IAMAuthN specifies whether IAM authentication is enabled. IAMAuthN bool // UserAgent is the full user-agent of the alloydbconn.Dialer. UserAgent string // CacheHit specifies whether connection info was present in the cache. CacheHit bool // DialStatus specifies the result of the dial attempt. DialStatus string // RefreshStatus specifies the result of the refresh operation. RefreshStatus string // RefreshType specifies the type of cache in use (e.g., refresh ahead or // lazy). RefreshType string } // RecordBytesRxCount records the number of bytes received for a particular // instance. func (m *metricRecorder) RecordBytesRxCount(ctx context.Context, bytes int64, a Attributes) { m.mBytesRx.Add(ctx, bytes, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), )), ) } // RecordBytesTxCount records the number of bytes send for a paritcular // instance. func (m *metricRecorder) RecordBytesTxCount(ctx context.Context, bytes int64, a Attributes) { m.mBytesTx.Add(ctx, bytes, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), )), ) } // RecordDialCount records increments the number of dial attempts. func (m *metricRecorder) RecordDialCount(ctx context.Context, a Attributes) { m.mDialCount.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), attribute.String(authType, authTypeValue(a.IAMAuthN)), attribute.Bool(isCacheHit, a.CacheHit), attribute.String(status, a.DialStatus)), )) } // RecordDialLatency records a latency measurement for a particular dial // attempt. func (m *metricRecorder) RecordDialLatency(ctx context.Context, latencyMS int64, a Attributes) { m.mDialLatency.Record(ctx, float64(latencyMS), metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), )), ) } // RecordOpenConnection increments the number of open connections. func (m *metricRecorder) RecordOpenConnection(ctx context.Context, a Attributes) { m.mOpenConns.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), attribute.String(authType, authTypeValue(a.IAMAuthN)), )), ) } // RecordClosedConnection decrements the number of open connections. func (m *metricRecorder) RecordClosedConnection(ctx context.Context, a Attributes) { m.mOpenConns.Add(ctx, -1, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), attribute.String(authType, authTypeValue(a.IAMAuthN)), )), ) } // RecordRefreshCount records the result of a refresh operation. func (m *metricRecorder) RecordRefreshCount(ctx context.Context, a Attributes) { m.mRefreshCount.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(connectorType, connectorTypeValue(a.UserAgent)), attribute.String(status, a.RefreshStatus), attribute.String(refreshType, a.RefreshType), )), ) } // NullMetricRecorder implements the MetricRecorder interface with no-ops. It // is useful for disabling the built-in metrics. type NullMetricRecorder struct{} // Shutdown is a no-op. func (NullMetricRecorder) Shutdown(context.Context) error { return nil } // RecordBytesRxCount is a no-op. func (NullMetricRecorder) RecordBytesRxCount(context.Context, int64, Attributes) {} // RecordBytesTxCount is a no-op. func (NullMetricRecorder) RecordBytesTxCount(context.Context, int64, Attributes) {} // RecordDialCount is a no-op. func (NullMetricRecorder) RecordDialCount(context.Context, Attributes) {} // RecordDialLatency is a no-op. func (NullMetricRecorder) RecordDialLatency(context.Context, int64, Attributes) {} // RecordOpenConnection is a no-op. func (NullMetricRecorder) RecordOpenConnection(context.Context, Attributes) {} // RecordClosedConnection is a no-op. func (NullMetricRecorder) RecordClosedConnection(context.Context, Attributes) {} // RecordRefreshCount is a no-op. func (NullMetricRecorder) RecordRefreshCount(context.Context, Attributes) {}