common/otel_metrics.go (133 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"context"
"errors"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
const (
// IOMethod annotates the event that opens or closes a connection or file.
IOMethod = "io_method"
// GCSMethod annotates the method called in the GCS client library.
GCSMethod = "gcs_method"
// FSOp annotates the file system op processed.
FSOp = "fs_op"
// FSErrCategory reduces the cardinality of FSError by grouping errors together.
FSErrCategory = "fs_error_category"
// ReadType annotates the read operation with the type - Sequential/Random
ReadType = "read_type"
// CacheHit annotates the read operation from file cache with true or false.
CacheHit = "cache_hit"
)
var (
fsOpsMeter = otel.Meter("fs_op")
gcsMeter = otel.Meter("gcs")
fileCacheMeter = otel.Meter("file_cache")
)
// otelMetrics maintains the list of all metrics computed in GCSFuse.
type otelMetrics struct {
fsOpsCount metric.Int64Counter
fsOpsErrorCount metric.Int64Counter
fsOpsLatency metric.Float64Histogram
gcsReadCount metric.Int64Counter
gcsReadBytesCountAtomic *atomic.Int64
gcsReaderCount metric.Int64Counter
gcsRequestCount metric.Int64Counter
gcsRequestLatency metric.Float64Histogram
gcsDownloadBytesCount metric.Int64Counter
fileCacheReadCount metric.Int64Counter
fileCacheReadBytesCount metric.Int64Counter
fileCacheReadLatency metric.Float64Histogram
}
func attrsToRecordOption(attrs []MetricAttr) []metric.RecordOption {
otelOptions := make([]metric.RecordOption, 0, len(attrs))
for _, attr := range attrs {
otelOptions = append(otelOptions, metric.WithAttributes(attribute.String(attr.Key, attr.Value)))
}
return otelOptions
}
func attrsToAddOption(attrs []MetricAttr) []metric.AddOption {
otelOptions := make([]metric.AddOption, 0, len(attrs))
for _, attr := range attrs {
otelOptions = append(otelOptions, metric.WithAttributes(attribute.String(attr.Key, attr.Value)))
}
return otelOptions
}
func (o *otelMetrics) GCSReadBytesCount(_ context.Context, inc int64) {
o.gcsReadBytesCountAtomic.Add(inc)
}
func (o *otelMetrics) GCSReaderCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsReaderCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) GCSRequestCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsRequestCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) GCSRequestLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.gcsRequestLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}
func (o *otelMetrics) GCSReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsReadCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) GCSDownloadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsDownloadBytesCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) OpsCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fsOpsCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) OpsLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.fsOpsLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}
func (o *otelMetrics) OpsErrorCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fsOpsErrorCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) FileCacheReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fileCacheReadCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) FileCacheReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fileCacheReadBytesCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}
func (o *otelMetrics) FileCacheReadLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.fileCacheReadLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}
func NewOTelMetrics() (MetricHandle, error) {
fsOpsCount, err1 := fsOpsMeter.Int64Counter("fs/ops_count", metric.WithDescription("The cumulative number of ops processed by the file system."))
fsOpsLatency, err2 := fsOpsMeter.Float64Histogram("fs/ops_latency", metric.WithDescription("The cumulative distribution of file system operation latencies"), metric.WithUnit("us"),
defaultLatencyDistribution)
fsOpsErrorCount, err3 := fsOpsMeter.Int64Counter("fs/ops_error_count", metric.WithDescription("The cumulative number of errors generated by file system operations"))
gcsReadCount, err4 := gcsMeter.Int64Counter("gcs/read_count", metric.WithDescription("Specifies the number of gcs reads made along with type - Sequential/Random"))
gcsDownloadBytesCount, err5 := gcsMeter.Int64Counter("gcs/download_bytes_count",
metric.WithDescription("The cumulative number of bytes downloaded from GCS along with type - Sequential/Random"),
metric.WithUnit("By"))
var gcsReadBytesCountAtomic atomic.Int64
_, err6 := gcsMeter.Int64ObservableCounter("gcs/read_bytes_count",
metric.WithDescription("The cumulative number of bytes read from GCS objects."),
metric.WithUnit("By"),
metric.WithInt64Callback(func(_ context.Context, obsrv metric.Int64Observer) error {
obsrv.Observe(gcsReadBytesCountAtomic.Load())
return nil
}))
gcsReaderCount, err7 := gcsMeter.Int64Counter("gcs/reader_count", metric.WithDescription("The cumulative number of GCS object readers opened or closed."))
gcsRequestCount, err8 := gcsMeter.Int64Counter("gcs/request_count", metric.WithDescription("The cumulative number of GCS requests processed."))
gcsRequestLatency, err9 := gcsMeter.Float64Histogram("gcs/request_latencies", metric.WithDescription("The cumulative distribution of the GCS request latencies."), metric.WithUnit("ms"))
fileCacheReadCount, err10 := fileCacheMeter.Int64Counter("file_cache/read_count",
metric.WithDescription("Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false"))
fileCacheReadBytesCount, err11 := fileCacheMeter.Int64Counter("file_cache/read_bytes_count",
metric.WithDescription("The cumulative number of bytes read from file cache along with read type - Sequential/Random"),
metric.WithUnit("By"))
fileCacheReadLatency, err12 := fileCacheMeter.Float64Histogram("file_cache/read_latencies",
metric.WithDescription("The cumulative distribution of the file cache read latencies along with cache hit - true/false"),
metric.WithUnit("us"),
defaultLatencyDistribution)
if err := errors.Join(err1, err2, err3, err4, err5, err6, err7, err8, err9, err10, err11, err12); err != nil {
return nil, err
}
return &otelMetrics{
fsOpsCount: fsOpsCount,
fsOpsErrorCount: fsOpsErrorCount,
fsOpsLatency: fsOpsLatency,
gcsReadCount: gcsReadCount,
gcsReadBytesCountAtomic: &gcsReadBytesCountAtomic,
gcsReaderCount: gcsReaderCount,
gcsRequestCount: gcsRequestCount,
gcsRequestLatency: gcsRequestLatency,
gcsDownloadBytesCount: gcsDownloadBytesCount,
fileCacheReadCount: fileCacheReadCount,
fileCacheReadBytesCount: fileCacheReadBytesCount,
fileCacheReadLatency: fileCacheReadLatency,
}, nil
}