internal/workloadmanager/workloadcollector.go (213 lines of code) (raw):
/*
Copyright 2025 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package workloadmanager collects workload manager metrics and sends them to Data Warehouse.
package workloadmanager
import (
"bufio"
"bytes"
"context"
"io"
"os"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/workloadagent/internal/daemon/configuration"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"
cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/gce/wlm"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
dwpb "github.com/GoogleCloudPlatform/workloadagentplatform/sharedprotos/datawarehouse"
)
// ConfigFileReader is a function that reads a config file.
type ConfigFileReader func(string) (io.ReadCloser, error)
// WorkloadType is an enum for the type of workload.
type WorkloadType string
const (
// UNKNOWN workload type.
UNKNOWN WorkloadType = "UNKNOWN"
// ORACLE workload type.
ORACLE WorkloadType = "ORACLE"
// MYSQL workload type.
MYSQL WorkloadType = "MYSQL"
// REDIS workload type.
REDIS WorkloadType = "REDIS"
// collectionFrequency is the frequency at which metrics are collected.
collectionFrequency = 5 * time.Minute
)
// WorkloadMetrics is a struct that collect data from override configuration file for testing purposes.
// Future enhancements will include the collection of actual WLM metrics.
type WorkloadMetrics struct {
WorkloadType WorkloadType
Metrics map[string]string
}
// WLMWriter is an interface for writing insights to Data Warehouse.
type WLMWriter interface {
WriteInsightAndGetResponse(project, location string, writeInsightRequest *dwpb.WriteInsightRequest) (*wlm.WriteInsightResponse, error)
}
// sendMetricsParams defines the set of parameters required to call sendMetrics
type sendMetricsParams struct {
wm []WorkloadMetrics
cp *cpb.CloudProperties
wlmService WLMWriter
}
// SendDataInsightParams defines the set of parameters required to call SendDataInsight
type SendDataInsightParams struct {
WLMetrics WorkloadMetrics
CloudProps *cpb.CloudProperties
WLMService WLMWriter
}
// metricEmitter is a container for constructing metrics from an override configuration file
type metricEmitter struct {
scanner *bufio.Scanner
workloadType WorkloadType
metrics map[string]string // Add a field to store metrics for the current workload
}
// Service is used to collect workload manager metrics and send them to Data Warehouse.
type Service struct {
Config *cpb.Configuration
Client WLMWriter
}
// MetricOverridePath is the path to the metric override file.
const MetricOverridePath = "/etc/google-cloud-workload-agent/wlmmetricoverride.yaml"
// Client creates a new WLM client.
func Client(ctx context.Context, config *cpb.Configuration) (WLMWriter, error) {
client, err := wlm.NewWLMClient(ctx, config.GetDataWarehouseEndpoint())
if err != nil {
return nil, err
}
return client, nil
}
// CollectAndSendMetricsToDataWarehouse collects workload metrics and sends them to Data Warehouse.
func (s *Service) CollectAndSendMetricsToDataWarehouse(ctx context.Context, a any) {
if !readAndLogMetricOverrideYAML(ctx, readFileWrapper) {
return
}
ticker := time.NewTicker(collectionFrequency)
defer ticker.Stop()
for {
wm := collectOverrideMetrics(ctx, readFileWrapper)
sendMetricsToDataWarehouse(ctx, sendMetricsParams{
wm: wm,
cp: s.Config.GetCloudProperties(),
wlmService: s.Client,
})
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Metric collection override cancellation requested")
return
case <-ticker.C:
continue
}
}
}
func readAndLogMetricOverrideYAML(ctx context.Context, reader ConfigFileReader) bool {
file, err := reader(MetricOverridePath)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not read the metric override file", "error", err)
return false
}
defer file.Close()
log.CtxLogger(ctx).Infow("Reading override metrics from yaml file", "file", MetricOverridePath)
// Create a new scanner
scanner := bufio.NewScanner(file)
// Loop over each line in the file
for scanner.Scan() {
log.CtxLogger(ctx).Debug("Override metric line: " + scanner.Text())
}
if err = scanner.Err(); err != nil {
log.CtxLogger(ctx).Warnw("Could not read from the override metrics file", "error", err)
}
return true
}
// collectOverrideMetrics reads workload metrics from an override file.
func collectOverrideMetrics(ctx context.Context, reader ConfigFileReader) []WorkloadMetrics {
file, err := reader(MetricOverridePath)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not read the metric override file", "error", err)
return []WorkloadMetrics{}
}
defer file.Close()
var wm []WorkloadMetrics
scanner := bufio.NewScanner(file)
metricEmitter := metricEmitter{scanner: scanner}
for {
wt, metrics, last := metricEmitter.getMetric(ctx)
wm = append(wm, WorkloadMetrics{WorkloadType: wt, Metrics: metrics})
if last {
break
}
}
return wm
}
// getMetric reads the next metric from the underlying scanner.
//
// It returns the workload type, a map for validation metrics,
// and a boolean indicating whether this is the last metric in the current workload group.
func (e *metricEmitter) getMetric(ctx context.Context) (WorkloadType, map[string]string, bool) {
if e.metrics == nil {
e.metrics = make(map[string]string) // Initialize the metrics map if it's nil
}
for e.scanner.Scan() {
line := e.scanner.Text()
if line == "" || strings.HasPrefix(line, "#") {
continue // Skip empty lines and comments directly
}
key, value, found := strings.Cut(line, ":")
if !found {
log.CtxLogger(ctx).Warn("Invalid format: " + line)
continue
}
key = strings.TrimSpace(key)
value = strings.TrimSpace(value)
if key == "workload_type" {
// Found the first workload type, continue scanning for its metrics.
if e.workloadType == "" {
e.workloadType = WorkloadType(value)
continue
}
// Found a new workload type, return the current one with its metrics
workloadType := e.workloadType
metrics := e.metrics
e.workloadType = WorkloadType(value) // Update the workload type for the next record
e.metrics = make(map[string]string) // Reset the metrics map for the new workload
return workloadType, metrics, false
}
e.metrics[key] = value
}
if err := e.scanner.Err(); err != nil {
log.CtxLogger(ctx).Warnw("Could not read from the override metrics file", "error", err)
}
// Reached end of file, return the last workload type and its metrics
workloadType := e.workloadType
metrics := e.metrics
return workloadType, metrics, true
}
func sendMetricsToDataWarehouse(ctx context.Context, params sendMetricsParams) {
log.CtxLogger(ctx).Info("Sending metrics to Data Warehouse")
var wg sync.WaitGroup
for _, wm := range params.wm {
wg.Add(1)
go func(wm WorkloadMetrics) {
defer wg.Done()
SendDataInsight(ctx, SendDataInsightParams{
WLMetrics: wm,
CloudProps: params.cp,
WLMService: params.wlmService,
})
}(wm)
}
wg.Wait()
}
// SendDataInsight sends a data insight to Data Warehouse.
func SendDataInsight(ctx context.Context, params SendDataInsightParams) (*wlm.WriteInsightResponse, error) {
req := createWriteInsightRequest(ctx, params.WLMetrics, params.CloudProps)
res, err := params.WLMService.WriteInsightAndGetResponse(params.CloudProps.GetProjectId(), params.CloudProps.GetRegion(), req)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to send metrics to Data Warehouse", "error", err, "workload_type", params.WLMetrics.WorkloadType)
usagemetrics.Error(usagemetrics.DataWarehouseWriteInsightFailure)
return nil, err
}
log.CtxLogger(ctx).Infow("Sent metrics to Data Warehouse", "workload_type", params.WLMetrics.WorkloadType)
return res, nil
}
// createWriteInsightRequest creates a WriteInsightRequest from the given WorkloadMetrics and CloudProperties.
func createWriteInsightRequest(ctx context.Context, wm WorkloadMetrics, cp *cpb.CloudProperties) *dwpb.WriteInsightRequest {
log.CtxLogger(ctx).Debugw("Create WriteInsightRequest and call WriteInsight", "workload_type", wm.WorkloadType)
workloadTypeMap := map[WorkloadType]dwpb.TorsoValidation_WorkloadType{
ORACLE: dwpb.TorsoValidation_ORACLE,
MYSQL: dwpb.TorsoValidation_MYSQL,
REDIS: dwpb.TorsoValidation_REDIS,
UNKNOWN: dwpb.TorsoValidation_WORKLOAD_TYPE_UNSPECIFIED,
}
workloadType, ok := workloadTypeMap[wm.WorkloadType]
if !ok {
workloadType = dwpb.TorsoValidation_WORKLOAD_TYPE_UNSPECIFIED
}
return &dwpb.WriteInsightRequest{
Insight: &dwpb.Insight{
InstanceId: cp.GetInstanceId(),
TorsoValidation: &dwpb.TorsoValidation{
WorkloadType: workloadType,
ValidationDetails: wm.Metrics,
ProjectId: cp.GetProjectId(),
InstanceName: cp.GetInstanceName(),
AgentVersion: configuration.AgentVersion,
},
},
}
}
func readFileWrapper(path string) (io.ReadCloser, error) {
content, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return io.NopCloser(bytes.NewReader(content)), nil
}