internal/workloadmanager/workloadcollector.go (463 lines of code) (raw):
/*
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package workloadmanager collects workload manager metrics and sends them to Cloud Monitoring
package workloadmanager
import (
"bufio"
"context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
mpb "google.golang.org/genproto/googleapis/monitoring/v3"
mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/cloudmonitoring"
"golang.org/x/exp/slices"
"github.com/GoogleCloudPlatform/sapagent/internal/configuration"
"github.com/GoogleCloudPlatform/sapagent/internal/instanceinfo"
"github.com/GoogleCloudPlatform/sapagent/internal/usagemetrics"
"github.com/GoogleCloudPlatform/sapagent/internal/utils/protostruct"
cnfpb "github.com/GoogleCloudPlatform/sapagent/protos/configuration"
ipb "github.com/GoogleCloudPlatform/sapagent/protos/instanceinfo"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/recovery"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/timeseries"
dwpb "github.com/GoogleCloudPlatform/workloadagentplatform/sharedprotos/datawarehouse"
)
/*
ConfigFileReader abstracts loading and reading files into an io.ReadCloser object. ConfigFileReader Example usage:
ConfigFileReader(func(path string) (io.ReadCloser, error) {
file, err := os.Open(path)
var f io.ReadCloser = file
return f, err
})
*/
type ConfigFileReader func(string) (io.ReadCloser, error)
// WorkloadMetrics is a container for monitoring TimeSeries metrics.
type WorkloadMetrics struct {
Metrics []*mrpb.TimeSeries
}
// metricEmitter is a container for constructing metrics from an override configuration file
type metricEmitter struct {
scanner *bufio.Scanner
tmpMetricName string
}
type wlmInterface interface {
WriteInsight(project, location string, writeInsightRequest *dwpb.WriteInsightRequest) error
}
// sendMetricsParams defines the set of parameters required to call sendMetrics
type sendMetricsParams struct {
wm WorkloadMetrics
cp *ipb.CloudProperties
bareMetal bool
sendToCloudMonitoring bool
timeSeriesCreator cloudmonitoring.TimeSeriesCreator
backOffIntervals *cloudmonitoring.BackOffIntervals
wlmService wlmInterface
}
var (
now = currentTime
dailyUsageRoutine *recovery.RecoverableRoutine
collectRoutine *recovery.RecoverableRoutine
)
const metricOverridePath = "/etc/google-cloud-sap-agent/wlmmetricoverride.yaml"
const metricTypePrefix = "workload.googleapis.com/sap/validation/"
func currentTime() int64 {
return time.Now().Unix()
}
func start(ctx context.Context, a any) {
var params Parameters
if v, ok := a.(Parameters); ok {
params = v
} else {
log.CtxLogger(ctx).Error("Cannot collect Workload Manager metrics, no collection configuration detected")
return
}
// Log usagemetric if hdbuserstore key is configured.
if params.Config.GetCollectionConfiguration().GetWorkloadValidationDbMetricsConfig().GetHdbuserstoreKey() != "" {
usagemetrics.Action(usagemetrics.HDBUserstoreKeyConfigured)
}
log.CtxLogger(ctx).Infow("Starting collection of Workload Manager metrics", "definitionVersion", params.WorkloadConfig.GetVersion())
cmf := time.Duration(params.Config.GetCollectionConfiguration().GetWorkloadValidationMetricsFrequency()) * time.Second
if cmf <= 0 {
// default it to 5 minutes
cmf = time.Duration(5) * time.Minute
}
configurableMetricsTicker := time.NewTicker(cmf)
defer configurableMetricsTicker.Stop()
dbmf := time.Duration(params.Config.GetCollectionConfiguration().GetWorkloadValidationDbMetricsFrequency()) * time.Second
if dbmf <= 0 {
// default it to 1 hour
dbmf = time.Duration(3600) * time.Second
}
databaseMetricTicker := time.NewTicker(dbmf)
defer databaseMetricTicker.Stop()
heartbeatTicker := params.HeartbeatSpec.CreateTicker()
defer heartbeatTicker.Stop()
// Do not wait for the first tick and start metric collection immediately.
select {
case <-ctx.Done():
log.CtxLogger(ctx).Debug("Workload metrics cancellation requested")
return
default:
collectWorkloadMetricsOnce(ctx, params)
if err := collectDBMetricsOnce(ctx, params); err != nil {
log.CtxLogger(ctx).Warn(err)
}
}
for {
select {
case <-ctx.Done():
log.CtxLogger(ctx).Debug("Workload metrics cancellation requested")
return
case cd := <-params.WorkloadConfigCh:
params.WorkloadConfig = cd.GetWorkloadValidation()
log.CtxLogger(ctx).Infow("Received updated workload collection configuration", "version", params.WorkloadConfig.GetVersion())
case <-heartbeatTicker.C:
params.HeartbeatSpec.Beat()
case <-configurableMetricsTicker.C:
collectWorkloadMetricsOnce(ctx, params)
case <-databaseMetricTicker.C:
if err := collectDBMetricsOnce(ctx, params); err != nil {
log.CtxLogger(ctx).Warn(err)
}
}
}
}
// collectWorkloadMetricsOnce issues a heartbeat and initiates one round of metric collection.
func collectWorkloadMetricsOnce(ctx context.Context, params Parameters) {
params.HeartbeatSpec.Beat()
if params.Remote {
log.CtxLogger(ctx).Info("Collecting metrics from remote instances")
collectAndSendRemoteMetrics(ctx, params)
return
}
log.CtxLogger(ctx).Info("Collecting metrics from this instance")
metrics := collectMetricsFromConfig(ctx, params, metricOverridePath)
sendMetrics(ctx, sendMetricsParams{
wm: metrics,
cp: params.Config.GetCloudProperties(),
bareMetal: params.Config.GetBareMetal(),
sendToCloudMonitoring: params.Config.GetSupportConfiguration().GetSendWorkloadValidationMetricsToCloudMonitoring().GetValue(),
timeSeriesCreator: params.TimeSeriesCreator,
backOffIntervals: params.BackOffs,
wlmService: params.WLMService,
})
}
// StartMetricsCollection continuously collects Workload Manager metrics for SAP workloads.
// Returns true if the collection goroutine is started, and false otherwise.
func StartMetricsCollection(ctx context.Context, params Parameters) bool {
if params.Config.GetCollectionConfiguration().GetWorkloadValidationRemoteCollection() == nil &&
!params.Config.GetCollectionConfiguration().GetCollectWorkloadValidationMetrics().GetValue() {
log.CtxLogger(ctx).Info("Not collecting Workload Manager metrics")
return false
}
if params.OSType == "windows" {
log.CtxLogger(ctx).Warn("Workload Manager metrics collection is not supported for windows platform")
return false
}
if params.WorkloadConfig == nil {
log.CtxLogger(ctx).Error("Cannot collect Workload Manager metrics, no collection configuration detected")
return false
}
dailyUsageRoutine = &recovery.RecoverableRoutine{
Routine: func(_ context.Context, a any) {
if v, ok := a.(int); ok {
usagemetrics.LogActionDaily(v)
}
},
ErrorCode: usagemetrics.UsageMetricsDailyLogError,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 24 * time.Hour,
}
if params.Remote {
dailyUsageRoutine.RoutineArg = usagemetrics.RemoteWLMMetricsCollection
} else {
dailyUsageRoutine.RoutineArg = usagemetrics.CollectWLMMetrics
}
dailyUsageRoutine.StartRoutine(ctx)
collectRoutine = &recovery.RecoverableRoutine{
Routine: start,
RoutineArg: params,
ErrorCode: usagemetrics.WLMCollectionRoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 10 * time.Second,
}
collectRoutine.StartRoutine(ctx)
return true
}
// collectMetricsFromConfig returns the result of metric collection using the
// collection definition configuration supplied to the agent.
//
// The results of this function can be overridden using a metricOverride file.
func collectMetricsFromConfig(ctx context.Context, params Parameters, metricOverride string) WorkloadMetrics {
log.CtxLogger(ctx).Info("Collecting Workload Manager metrics...")
if fileInfo, err := params.OSStatReader(metricOverride); fileInfo != nil && err == nil {
log.CtxLogger(ctx).Info("Using override metrics from yaml file")
return collectOverrideMetrics(ctx, params.Config, params.ConfigFileReader, metricOverride)
}
// Read the latest instance info for this system.
params.InstanceInfoReader.Read(ctx, params.Config, instanceinfo.NetworkInterfaceAddressMap)
// Collect all metrics specified by the WLM Validation config.
var system, corosync, hana, netweaver, pacemaker, custom WorkloadMetrics
var wg sync.WaitGroup
wg.Add(5)
systemMetricsRoutine := &recovery.RecoverableRoutine{
Routine: func(ctx context.Context, a any) {
defer wg.Done()
system = CollectSystemMetricsFromConfig(ctx, params)
},
ErrorCode: usagemetrics.WLMCollectionSystemRoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 5 * time.Second,
}
systemMetricsRoutine.StartRoutine(ctx)
hanaMetricsRoutine := &recovery.RecoverableRoutine{
Routine: func(ctx context.Context, a any) {
defer wg.Done()
hana = CollectHANAMetricsFromConfig(ctx, params)
},
ErrorCode: usagemetrics.WLMCollectionHANARoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 5 * time.Second,
}
hanaMetricsRoutine.StartRoutine(ctx)
netweaverMetricsRoutine := &recovery.RecoverableRoutine{
Routine: func(ctx context.Context, a any) {
defer wg.Done()
netweaver = CollectNetWeaverMetricsFromConfig(ctx, params)
},
ErrorCode: usagemetrics.WLMCollectionNetweaverRoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 5 * time.Second,
}
netweaverMetricsRoutine.StartRoutine(ctx)
pacemakerMetricsRoutine := &recovery.RecoverableRoutine{
Routine: func(ctx context.Context, a any) {
defer wg.Done()
pacemaker = CollectPacemakerMetricsFromConfig(ctx, params)
v := 0.0
if len(pacemaker.Metrics) > 0 && len(pacemaker.Metrics[0].Points) > 0 {
v = pacemaker.Metrics[0].Points[0].GetValue().GetDoubleValue()
}
corosync = CollectCorosyncMetricsFromConfig(ctx, params, v)
},
ErrorCode: usagemetrics.WLMCollectionPacemakerRoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 5 * time.Second,
}
pacemakerMetricsRoutine.StartRoutine(ctx)
customMetricsRoutine := &recovery.RecoverableRoutine{
Routine: func(ctx context.Context, a any) {
defer wg.Done()
custom = CollectCustomMetricsFromConfig(ctx, params)
},
ErrorCode: usagemetrics.WLMCollectionCustomRoutineFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: 5 * time.Second,
}
customMetricsRoutine.StartRoutine(ctx)
wg.Wait()
// Append the shared system metrics to all other metrics.
sharedLabels := sharedLabels(system.Metrics[0].Metric.Labels)
appendLabels(corosync.Metrics[0].Metric.Labels, sharedLabels)
appendLabels(hana.Metrics[0].Metric.Labels, sharedLabels)
appendLabels(netweaver.Metrics[0].Metric.Labels, sharedLabels)
appendLabels(pacemaker.Metrics[0].Metric.Labels, sharedLabels)
appendLabels(custom.Metrics[0].Metric.Labels, sharedLabels)
// Concatenate all of the metrics together.
allMetrics := []*mrpb.TimeSeries{}
allMetrics = append(allMetrics, system.Metrics...)
allMetrics = append(allMetrics, corosync.Metrics...)
allMetrics = append(allMetrics, hana.Metrics...)
allMetrics = append(allMetrics, netweaver.Metrics...)
allMetrics = append(allMetrics, pacemaker.Metrics...)
allMetrics = append(allMetrics, custom.Metrics...)
return WorkloadMetrics{Metrics: allMetrics}
}
func sharedLabels(labels map[string]string) map[string]string {
shared := make(map[string]string)
for k, v := range labels {
if slices.Contains(sharedSystemMetrics, k) {
shared[k] = v
}
}
return shared
}
func appendLabels(dst, src map[string]string) {
for k, v := range src {
dst[k] = v
}
}
/*
Utilize an override configuration file to collect metrics for testing purposes. This allows
sending WLM metrics without creating specific SAP setups. The override file will contain the metric
type followed by all metric labels associated with that type. Example override file contents:
metric: system
metric_value: 1
os: rhel-8.4
agent_version: 2.6
gcloud: true
metric: hana
metric_value: 0
disk_log_mount: /hana/log
*/
func collectOverrideMetrics(ctx context.Context, config *cnfpb.Configuration, reader ConfigFileReader, metricOverride string) WorkloadMetrics {
file, err := reader(metricOverride)
if err != nil {
log.CtxLogger(ctx).Warnw("Could not read the metric override file", "error", err)
return WorkloadMetrics{}
}
defer file.Close()
wm := WorkloadMetrics{}
scanner := bufio.NewScanner(file)
metricEmitter := metricEmitter{scanner, ""}
for {
metricName, metricValue, labels, last := metricEmitter.getMetric(ctx)
if metricName != "" {
wm.Metrics = append(wm.Metrics, createTimeSeries(metricTypePrefix+metricName, labels, metricValue, config)...)
}
if last {
break
}
}
return wm
}
// Reads all metric labels for a type. Returns false if there is more content in the scanner, true otherwise.
func (e *metricEmitter) getMetric(ctx context.Context) (string, float64, map[string]string, bool) {
metricName := e.tmpMetricName
metricValue := 0.0
labels := make(map[string]string)
var err error
for e.scanner.Scan() {
key, value, found := parseScannedText(ctx, e.scanner.Text())
if !found {
continue
}
switch key {
case "metric":
if metricName != "" {
e.tmpMetricName = value
return metricName, metricValue, labels, false
}
metricName = value
case "metric_value":
if metricValue, err = strconv.ParseFloat(value, 64); err != nil {
log.CtxLogger(ctx).Warnw("Failed to parse float", "value", value, "error", err)
}
default:
labels[key] = strings.TrimSpace(value)
}
}
if err = e.scanner.Err(); err != nil {
log.CtxLogger(ctx).Warnw("Could not read from the override metrics file", "error", err)
}
return metricName, metricValue, labels, true
}
// parseScannedText extracts a key and value pair from a scanned line of text.
//
// The expected format for the text string is: '<key>: <value>'.
func parseScannedText(ctx context.Context, text string) (key, value string, found bool) {
// Ignore empty lines and comments.
if text == "" || strings.HasPrefix(text, "#") {
return "", "", false
}
key, value, found = strings.Cut(text, ":")
if !found {
log.CtxLogger(ctx).Warnw("Could not parse key, value pair. Expected format: '<key>: <value>'", "text", text)
}
return strings.TrimSpace(key), strings.TrimSpace(value), found
}
// sendMetrics performs the request(s) to write collected metric data.
//
// There are two workflows for sending metric data, determined by the
// send_to_cloud_monitoring configuration flag. The default workflow involves
// sending a write request to Data Warehouse. The legacy workflow involves
// storing metrics as time series data in Cloud Monitoring, with a secondary
// write to Data Warehouse that is not taken into consideration for the
// overall success or failure of the function.
func sendMetrics(ctx context.Context, params sendMetricsParams) int {
if len(params.wm.Metrics) == 0 {
log.CtxLogger(ctx).Info("No Workload Manager metrics to send")
return 0
}
logMetricData(ctx, params.wm.Metrics)
if params.sendToCloudMonitoring {
sentMetrics := len(params.wm.Metrics)
log.CtxLogger(ctx).Infow("Sending metrics to Cloud Monitoring...", "number", len(params.wm.Metrics))
request := mpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", params.cp.GetProjectId()),
TimeSeries: params.wm.Metrics,
}
if err := cloudmonitoring.CreateTimeSeriesWithRetry(ctx, params.timeSeriesCreator, &request, params.backOffIntervals); err != nil {
log.CtxLogger(ctx).Errorw("Failed to send metrics to Cloud Monitoring", "error", err)
usagemetrics.Error(usagemetrics.WLMMetricCollectionFailure)
sentMetrics = 0
} else {
log.CtxLogger(ctx).Infow("Sent metrics to Cloud Monitoring", "number", len(params.wm.Metrics))
}
sendMetricsToDataWarehouseSecondary(ctx, params)
return sentMetrics
}
return sendMetricsToDataWarehouse(ctx, params)
}
// sendMetricsToDataWarehouse initiates the Data Warehouse write insight request.
func sendMetricsToDataWarehouse(ctx context.Context, params sendMetricsParams) int {
sentMetrics := len(params.wm.Metrics)
log.CtxLogger(ctx).Infow("Sending metrics to Data Warehouse...", "number", len(params.wm.Metrics))
// Send request to regional endpoint. Ex: "us-central1-a" -> "us-central1"
location := params.cp.GetZone()
if params.bareMetal {
location = params.cp.GetRegion()
}
locationParts := strings.Split(location, "-")
location = strings.Join([]string{locationParts[0], locationParts[1]}, "-")
req := createWriteInsightRequest(params.wm, params.cp)
if err := params.wlmService.WriteInsight(params.cp.GetProjectId(), location, req); err != nil {
log.CtxLogger(ctx).Errorw("Failed to send metrics to Data Warehouse", "error", err)
usagemetrics.Error(usagemetrics.WLMMetricCollectionFailure)
sentMetrics = 0
} else {
log.CtxLogger(ctx).Infow("Sent metrics to Data Warehouse", "number", len(params.wm.Metrics))
}
return sentMetrics
}
// sendMetricsToDataWarehouseSecondary initiates the Data Warehouse write
// insight request. However, the log level has been lowered to DEBUG, and there
// is no usagemetrics logging if the request fails.
func sendMetricsToDataWarehouseSecondary(ctx context.Context, params sendMetricsParams) int {
sentMetrics := len(params.wm.Metrics)
log.CtxLogger(ctx).Debugw("Sending metrics to Data Warehouse...", "number", len(params.wm.Metrics))
// Send request to regional endpoint. Ex: "us-central1-a" -> "us-central1"
location := params.cp.GetZone()
if params.bareMetal {
location = params.cp.GetRegion()
}
locationParts := strings.Split(location, "-")
location = strings.Join([]string{locationParts[0], locationParts[1]}, "-")
req := createWriteInsightRequest(params.wm, params.cp)
if err := params.wlmService.WriteInsight(params.cp.GetProjectId(), location, req); err != nil {
log.CtxLogger(ctx).Debugw("Failed to send metrics to Data Warehouse", "error", err)
sentMetrics = 0
} else {
log.CtxLogger(ctx).Debugw("Sent metrics to Data Warehouse", "number", len(params.wm.Metrics))
}
return sentMetrics
}
func createTimeSeries(t string, l map[string]string, v float64, c *cnfpb.Configuration) []*mrpb.TimeSeries {
now := ×tamppb.Timestamp{
Seconds: now(),
}
p := timeseries.Params{
BareMetal: c.BareMetal,
CloudProp: protostruct.ConvertCloudPropertiesToStruct(c.CloudProperties),
MetricType: t,
MetricLabels: l,
Timestamp: now,
Float64Value: v,
}
return []*mrpb.TimeSeries{timeseries.BuildFloat64(p)}
}
// createWriteInsightRequest converts a WorkloadMetrics time series into a WriteInsightRequest.
func createWriteInsightRequest(wm WorkloadMetrics, cp *ipb.CloudProperties) *dwpb.WriteInsightRequest {
validations := []*dwpb.SapValidation_ValidationDetail{}
for _, m := range wm.Metrics {
t := dwpb.SapValidation_SAP_VALIDATION_TYPE_UNSPECIFIED
switch m.GetMetric().GetType() {
case sapValidationSystem:
t = dwpb.SapValidation_SYSTEM
case sapValidationCorosync:
t = dwpb.SapValidation_COROSYNC
case sapValidationHANA:
t = dwpb.SapValidation_HANA
case sapValidationNetweaver:
t = dwpb.SapValidation_NETWEAVER
case sapValidationPacemaker:
t = dwpb.SapValidation_PACEMAKER
case sapValidationHANASecurity:
t = dwpb.SapValidation_HANA_SECURITY
case sapValidationCustom:
t = dwpb.SapValidation_CUSTOM
}
v := false
if len(m.GetPoints()) > 0 {
v = m.GetPoints()[0].GetValue().GetDoubleValue() > 0
}
validations = append(validations, &dwpb.SapValidation_ValidationDetail{
SapValidationType: t,
IsPresent: v,
Details: m.GetMetric().GetLabels(),
})
}
return &dwpb.WriteInsightRequest{
Insight: &dwpb.Insight{
InstanceId: cp.GetInstanceId(),
SapValidation: &dwpb.SapValidation{
ProjectId: cp.GetProjectId(),
Zone: cp.GetZone(),
ValidationDetails: validations,
},
},
AgentVersion: configuration.AgentVersion,
}
}
// logMetricData prints collected metric information to the agent log file.
func logMetricData(ctx context.Context, metrics []*mrpb.TimeSeries) {
for _, m := range metrics {
if len(m.GetPoints()) == 0 {
log.CtxLogger(ctx).Debugw("Metric has no point data", "metric", m.GetMetric().GetType())
continue
}
log.CtxLogger(ctx).Debugw("Metric", "metric", m.GetMetric().GetType(), "value", m.GetPoints()[0].GetValue().GetDoubleValue())
// Maps in Go are inherently unordered. Sort the keys to ensure consistent logging output.
labels := m.GetMetric().GetLabels()
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
log.CtxLogger(ctx).Debugw(" Label", "key", k, "value", labels[k])
}
}
}