internal/self_metrics/self_metrics.go (313 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package self_metrics
import (
"context"
"fmt"
"log"
"path/filepath"
"strings"
"time"
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/GoogleCloudPlatform/ops-agent/apps"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/attribute"
metricapi "go.opentelemetry.io/otel/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
agentMetricNamespace string = "agent.googleapis.com"
enabledReceiversMetricName string = "agent/ops_agent/enabled_receivers"
featureTrackingMetricName string = "agent/internal/ops/feature_tracking"
)
func getFullAgentMetricName(metricName string) string {
return fmt.Sprintf("%s/%s", agentMetricNamespace, metricName)
}
func agentMetricsPrefixFormatter(d metricdata.Metrics) string {
return getFullAgentMetricName(d.Name)
}
type EnabledReceivers struct {
MetricsReceiverCountsByType map[string]int
LogsReceiverCountsByType map[string]int
}
func CountEnabledReceivers(ctx context.Context, uc *confgenerator.UnifiedConfig) (EnabledReceivers, error) {
eR := EnabledReceivers{
MetricsReceiverCountsByType: make(map[string]int),
LogsReceiverCountsByType: make(map[string]int),
}
pipelines, err := uc.Pipelines(ctx)
if err != nil {
return eR, err
}
for _, p := range pipelines {
pipelineType, receiverType := p.Types()
if pipelineType == "metrics" {
eR.MetricsReceiverCountsByType[receiverType] += 1
} else if pipelineType == "logs" {
eR.LogsReceiverCountsByType[receiverType] += 1
}
}
return eR, nil
}
func InstrumentEnabledReceiversMetric(ctx context.Context, uc *confgenerator.UnifiedConfig, meter metricapi.Meter) error {
eR, err := CountEnabledReceivers(ctx, uc)
if err != nil {
return err
}
_, err = meter.Int64ObservableGauge(
enabledReceiversMetricName,
metricapi.WithInt64Callback(
func(ctx context.Context, observer metricapi.Int64Observer) error {
for rType, count := range eR.MetricsReceiverCountsByType {
labels := []attribute.KeyValue{
attribute.String("telemetry_type", "metrics"),
attribute.String("receiver_type", rType),
}
observer.Observe(int64(count), metricapi.WithAttributes(labels...))
}
for rType, count := range eR.LogsReceiverCountsByType {
labels := []attribute.KeyValue{
attribute.String("telemetry_type", "logs"),
attribute.String("receiver_type", rType),
}
observer.Observe(int64(count), metricapi.WithAttributes(labels...))
}
return nil
}),
)
if err != nil {
return err
}
return nil
}
func InstrumentFeatureTrackingMetric(ctx context.Context, userUc, mergedUc *confgenerator.UnifiedConfig, meter metricapi.Meter) error {
features, err := confgenerator.ExtractFeatures(ctx, userUc, mergedUc)
if err != nil {
return err
}
_, err = meter.Int64ObservableGauge(
featureTrackingMetricName,
metricapi.WithInt64Callback(
func(ctx context.Context, observer metricapi.Int64Observer) error {
for _, f := range features {
labels := []attribute.KeyValue{
attribute.String("module", f.Module),
attribute.String("feature", fmt.Sprintf("%s:%s", f.Kind, f.Type)),
attribute.String("key", strings.Join(f.Key, ".")),
attribute.String("value", f.Value),
}
observer.Observe(int64(1), metricapi.WithAttributes(labels...))
}
return nil
}),
)
if err != nil {
return err
}
return nil
}
func CreateFeatureTrackingMeterProvider(exporter metricsdk.Exporter, res *resource.Resource) *metricsdk.MeterProvider {
provider := metricsdk.NewMeterProvider(
metricsdk.WithReader(
metricsdk.NewPeriodicReader(
exporter,
metricsdk.WithInterval(2*time.Hour),
),
),
metricsdk.WithView(
metricsdk.NewView(
metricsdk.Instrument{
Name: featureTrackingMetricName,
Kind: metricsdk.InstrumentKindObservableGauge,
},
metricsdk.Stream{
Name: featureTrackingMetricName,
Aggregation: metricsdk.AggregationDefault{},
},
)),
metricsdk.WithResource(res),
)
return provider
}
func CreateEnabledReceiversMeterProvider(exporter metricsdk.Exporter, res *resource.Resource) *metricsdk.MeterProvider {
provider := metricsdk.NewMeterProvider(
metricsdk.WithReader(
metricsdk.NewPeriodicReader(
exporter,
),
),
metricsdk.WithView(
metricsdk.NewView(
metricsdk.Instrument{
Name: enabledReceiversMetricName,
Kind: metricsdk.InstrumentKindObservableGauge,
},
metricsdk.Stream{
Name: enabledReceiversMetricName,
Aggregation: metricsdk.AggregationDefault{},
},
)),
metricsdk.WithResource(res),
)
return provider
}
func CollectOpsAgentSelfMetrics(ctx context.Context, userUc, mergedUc *confgenerator.UnifiedConfig) (err error) {
// Resource for GCP and SDK detectors
res, err := resource.New(ctx,
resource.WithDetectors(gcp.NewDetector()),
resource.WithTelemetrySDK(),
)
if err != nil {
return fmt.Errorf("failed to create resource: %w", err)
}
// Create exporter pipeline
exporter, err := mexporter.New(
mexporter.WithMetricDescriptorTypeFormatter(agentMetricsPrefixFormatter),
mexporter.WithDisableCreateMetricDescriptors(),
)
if err != nil {
return fmt.Errorf("failed to create exporter: %w", err)
}
featureTrackingProvider := CreateFeatureTrackingMeterProvider(exporter, res)
err = InstrumentFeatureTrackingMetric(ctx, userUc, mergedUc, featureTrackingProvider.Meter("ops_agent/feature_tracking"))
if err != nil {
return fmt.Errorf("failed to instrument feature tracking: %w", err)
}
enabledReceiversProvider := CreateEnabledReceiversMeterProvider(exporter, res)
err = InstrumentEnabledReceiversMetric(ctx, mergedUc, enabledReceiversProvider.Meter("ops_agent/self_metrics"))
if err != nil {
return fmt.Errorf("failed to instrument enabled receivers: %w", err)
}
defer func() {
if serr := featureTrackingProvider.Shutdown(ctx); serr != nil {
myStatus, ok := status.FromError(serr)
if !ok && myStatus.Code() == codes.Unknown {
log.Print(serr)
} else if err == nil {
err = fmt.Errorf("failed to shutdown meter provider: %w", serr)
}
}
if serr := enabledReceiversProvider.Shutdown(ctx); serr != nil {
myStatus, ok := status.FromError(serr)
if !ok && myStatus.Code() == codes.Unknown {
log.Print(serr)
} else if err == nil {
err = fmt.Errorf("failed to shutdown meter provider: %w", serr)
}
}
}()
timer := time.NewTimer(10 * time.Second)
for {
select {
case <-timer.C:
err := featureTrackingProvider.ForceFlush(ctx)
if err != nil {
log.Print(err)
}
err = enabledReceiversProvider.ForceFlush(ctx)
if err != nil {
log.Print(err)
}
case <-ctx.Done():
return nil
}
}
}
func metricToJson(metrics pmetric.Metrics) ([]byte, error) {
jsonMarshaler := &pmetric.JSONMarshaler{}
jsonResult, err := jsonMarshaler.MarshalMetrics(metrics)
if err != nil {
return nil, err
}
return jsonResult, nil
}
func CollectEnabledReceiversMetricToOLTPJSON(ctx context.Context, uc *confgenerator.UnifiedConfig) ([]byte, error) {
eR, err := CountEnabledReceivers(ctx, uc)
if err != nil {
return nil, err
}
metrics := pmetric.NewMetrics()
resource := metrics.ResourceMetrics().AppendEmpty()
// Temporarily add resource attributes. This will be properly populated
// later in the pipeline by gce resource detector.
resource.Resource().Attributes().PutStr("k", "v")
gaugeMetric := resource.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
gaugeMetric.SetName(getFullAgentMetricName(enabledReceiversMetricName))
dataPoints := gaugeMetric.SetEmptyGauge().DataPoints()
// Sort map keys to always generate the same json output.
for _, k := range confgenerator.GetSortedKeys(eR.MetricsReceiverCountsByType) {
rType := k
count := eR.MetricsReceiverCountsByType[k]
point := dataPoints.AppendEmpty()
point.SetIntValue(int64(count))
attributes := point.Attributes()
attributes.PutStr("telemetry_type", "metrics")
attributes.PutStr("receiver_type", rType)
}
for _, k := range confgenerator.GetSortedKeys(eR.LogsReceiverCountsByType) {
rType := k
count := eR.LogsReceiverCountsByType[k]
point := dataPoints.AppendEmpty()
point.SetIntValue(int64(count))
attributes := point.Attributes()
attributes.PutStr("telemetry_type", "logs")
attributes.PutStr("receiver_type", rType)
}
return metricToJson(metrics)
}
func CollectFeatureTrackingMetricToOTLPJSON(ctx context.Context, userUc, mergedUc *confgenerator.UnifiedConfig) ([]byte, error) {
features, err := confgenerator.ExtractFeatures(ctx, userUc, mergedUc)
if err != nil {
return nil, err
}
metrics := pmetric.NewMetrics()
resource := metrics.ResourceMetrics().AppendEmpty()
resource.Resource().Attributes().PutStr("k", "v") // Resources can't be empty
gaugeMetric := resource.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
gaugeMetric.SetName(getFullAgentMetricName(featureTrackingMetricName))
dataPoints := gaugeMetric.SetEmptyGauge().DataPoints()
for _, f := range features {
point := dataPoints.AppendEmpty()
point.SetIntValue(int64(1))
attributes := point.Attributes()
attributes.PutStr("module", f.Module)
attributes.PutStr("feature", fmt.Sprintf("%s:%s", f.Kind, f.Type))
attributes.PutStr("key", strings.Join(f.Key, "."))
attributes.PutStr("value", f.Value)
}
return metricToJson(metrics)
}
// config and merged config respectively
func getUserAndMergedConfigs(ctx context.Context, userConfPath string) (*confgenerator.UnifiedConfig, *confgenerator.UnifiedConfig, error) {
userUc, err := confgenerator.ReadUnifiedConfigFromFile(ctx, userConfPath)
if err != nil {
return nil, nil, err
}
if userUc == nil {
userUc = &confgenerator.UnifiedConfig{}
}
mergedUc, err := confgenerator.MergeConfFiles(ctx, userConfPath, apps.BuiltInConfStructs)
if err != nil {
return nil, nil, err
}
return userUc, mergedUc, nil
}
func GenerateOpsAgentSelfMetricsOTLPJSON(ctx context.Context, config, outDir string) (err error) {
userUc, mergedUc, err := getUserAndMergedConfigs(ctx, config)
if err != nil {
return err
}
featureTrackingOTLPJSON, err := CollectFeatureTrackingMetricToOTLPJSON(ctx, userUc, mergedUc)
if err != nil {
return fmt.Errorf("failed to generate feature tracking metric otlp json: %w", err)
}
if err = confgenerator.WriteConfigFile(featureTrackingOTLPJSON, filepath.Join(outDir, "feature_tracking_otlp.json")); err != nil {
return fmt.Errorf("failed to write feature tracking metric otlp json file: %w", err)
}
enabledReceiverOTLPJSON, err := CollectEnabledReceiversMetricToOLTPJSON(ctx, mergedUc)
if err != nil {
return fmt.Errorf("failed to generate enabled receivers metric otlp json: %w", err)
}
if err = confgenerator.WriteConfigFile(enabledReceiverOTLPJSON, filepath.Join(outDir, "enabled_receivers_otlp.json")); err != nil {
return fmt.Errorf("failed to write enabled receivers metric otlp json file: %w", err)
}
return nil
}