internal/metricregistry/metricregistry.go (102 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package metricregistry implements a metric registry and provides a way to
// collect and export metrics.
package metricregistry
import (
"context"
"sync"
"time"
"github.com/GoogleCloudPlatform/galog"
acmpb "github.com/GoogleCloudPlatform/google-guest-agent/internal/acp/proto/google_guest_agent/acp"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/acs/client"
"google.golang.org/protobuf/proto"
)
const (
// messageType is key in labels for message type.
messageType = "message_type"
// GuestAgentModuleMetricMsg is the message type label to use with any metrics
// sent by agent.
guestAgentModuleMetricMsg = "agent_controlplane.GuestAgentModuleMetric"
// DefaultMaxRecords is the default maximum number of records to store in
// in-memory registry before flushing.
DefaultMaxRecords = 100
// DefaultFlushInterval is the default interval at which metrics are flushed.
DefaultFlushInterval = time.Minute
)
var (
// registryMu is a mutex to protect access to registries.
registryMu = sync.Mutex{}
// registries is a map of metric registries currently active.
registries = make(map[string]*MetricRegistry)
)
// MetricRegistry is a registry for metrics.
type MetricRegistry struct {
name string
metricsMu sync.Mutex
metrics []proto.Message
flushInterval time.Duration
maxRecords int
}
// New creates a new metric registry instance which buffers metrics for the
// specified duration and creates a job to flush them periodically. If a
// registry with the same name already exists, it returns the existing registry.
func New(ctx context.Context, d time.Duration, maxRecords int, name string) *MetricRegistry {
registryMu.Lock()
defer registryMu.Unlock()
if registries[name] != nil {
galog.Infof("Metric registry for %q already exists, returning existing registry", name)
return registries[name]
}
galog.Infof("Creating metric registry for %q with flush interval: %v, max records: %d", name, d, maxRecords)
mr := &MetricRegistry{
name: name,
flushInterval: d,
maxRecords: maxRecords,
}
registries[name] = mr
go mr.runFlusher(ctx)
return mr
}
// Run implements the scheduler job interface which flushes the metrics to ACS.
func (mr *MetricRegistry) runFlusher(ctx context.Context) {
ticker := time.NewTicker(mr.flushInterval)
defer ticker.Stop()
// Every manager that handles jobs (Event manager, scheduler, etc) might need
// to record metrics on-behalf of the jobs it is managing. To avoid circular
// dependency, this flusher implements its own scheduler.
for {
select {
case <-ticker.C:
mr.Flush(ctx)
case <-ctx.Done():
galog.Infof("Context cancelled, returning from flusher job for %q", mr.name)
return
}
}
}
// isMetricValid returns true if the metric is a known valid metric type.
func isMetricValid(metric proto.Message) bool {
if metric == nil {
return false
}
switch metric.(type) {
case *acmpb.GuestAgentModuleMetric, *acmpb.GuestAgentModuleMetrics:
return true
default:
return false
}
}
// size returns the current number of entries in the registry.
func (mr *MetricRegistry) size() int {
mr.metricsMu.Lock()
defer mr.metricsMu.Unlock()
return len(mr.metrics)
}
// addEntry adds a metric to the registry.
func (mr *MetricRegistry) addEntry(metric proto.Message) {
mr.metricsMu.Lock()
defer mr.metricsMu.Unlock()
mr.metrics = append(mr.metrics, metric)
}
// Record adds a metric to buffered registry. If the registry is full, it
// flushes the metrics before adding the new metric.
func (mr *MetricRegistry) Record(ctx context.Context, metric proto.Message) {
if !isMetricValid(metric) {
galog.V(2).Warnf("Ignoring invalid metric: %+v", metric)
return
}
if mr.size()+1 >= mr.maxRecords {
// Flush the metrics if the registry is full.
mr.Flush(ctx)
}
mr.addEntry(metric)
}
// Flush forces immediate flush of metrics recorded so far instead of waiting on
// next interval.
func (mr *MetricRegistry) Flush(ctx context.Context) {
// Get the metrics to flush outside the lock to avoid holding the lock for
// too long as ACS flush can be slow based on the network conditions.
galog.V(2).Debugf("Flushing metrics for %q", mr.name)
mr.metricsMu.Lock()
toFlush := mr.metrics
mr.metrics = nil
mr.metricsMu.Unlock()
for _, metric := range toFlush {
galog.V(2).Debugf("Flushing metric: %+v", metric)
_, err := client.SendMessage(ctx, map[string]string{messageType: guestAgentModuleMetricMsg}, metric)
if err != nil {
// Client internally retries these errors so returning here is not
// actionable and simply logged for debugging.
galog.V(2).Warnf("Failed to send metric: %+v to ACS: %v", metric, err)
}
}
}