internal/processmetrics/computeresources/computeresources.go (308 lines of code) (raw):
/*
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package computeresources provides code for collection of compute resources metrics
// like CPU and memory per process for various Hana, Netweaver and SAP Control Processes.
package computeresources
import (
"context"
"fmt"
"regexp"
"runtime"
"strconv"
"strings"
"github.com/shirou/gopsutil/v3/process"
"github.com/GoogleCloudPlatform/sapagent/internal/processmetrics/sapcontrol"
"github.com/GoogleCloudPlatform/sapagent/internal/utils/protostruct"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/cloudmonitoring"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/commandlineexecutor"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/timeseries"
mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
tspb "google.golang.org/protobuf/types/known/timestamppb"
cnfpb "github.com/GoogleCloudPlatform/sapagent/protos/configuration"
sapb "github.com/GoogleCloudPlatform/sapagent/protos/sapapp"
)
// Enum for choosing the metric type to collect.
const (
collectCPUMetric = iota
collectMemoryMetric
collectDiskIOPSMetric
)
const (
metricURL = "workload.googleapis.com"
linuxProcStatPath = "/proc/PID/stat"
linuxMemoryStatusFilePath = "/proc/PID/status"
thousand = 1000.0
million = 1000000.0
)
var (
memoryTypeRegexList = []string{`\nVmSize:.*\n`, `\nVmRSS:.*\n`, `\nVmSwap:.*\n`}
multiSpaceChars = regexp.MustCompile(`\s+`)
newlineChars = regexp.MustCompile(`\n`)
forwardSlashChar = regexp.MustCompile(`\/`)
dashChars = regexp.MustCompile(`\-`)
)
type (
// Parameters struct contains the parameters necessary for computeresources package common methods.
Parameters struct {
executor commandlineexecutor.Execute
Config *cnfpb.Configuration
client cloudmonitoring.TimeSeriesCreator
cpuMetricPath string
memoryMetricPath string
iopsReadsMetricPath string
iopsWritesMetricPath string
SAPInstance *sapb.SAPInstance
NewProc NewProcessWithContextHelper
getProcessListParams commandlineexecutor.Params
getABAPWPTableParams commandlineexecutor.Params
SAPControlClient sapcontrol.ClientInterface
LastValue map[string]*process.IOCountersStat
}
// NewProcessWithContextHelper is a strategy which creates a new process type
// from PSUtil library using the provided context and PID.
NewProcessWithContextHelper func(context.Context, int32) (UsageReader, error)
// UsageReader is an interface providing abstraction over PSUtil methods for calculating CPU
// percentage and memory usage stats for a process and makes them unit testable.
UsageReader interface {
CPUPercentWithContext(context.Context) (float64, error)
MemoryInfoWithContext(context.Context) (*process.MemoryInfoStat, error)
IOCountersWithContext(context.Context) (*process.IOCountersStat, error)
}
// ProcessInfo holds the relevant info for processes, including its name and pid.
ProcessInfo struct {
Name string
PID string
}
// MemoryUsage holds the memory usage metrics for a process.
MemoryUsage struct {
VMS *Metric
RSS *Metric
Swap *Metric
}
// DiskUsage holds the disk IOPS metrics for a process.
DiskUsage struct {
DeltaReads *Metric
DeltaWrites *Metric
}
// Metric is a struct to hold the metric value and its metadata.
Metric struct {
Value float64
ProcessInfo *ProcessInfo
TimeStamp *tspb.Timestamp
}
)
func newProc(ctx context.Context, fn NewProcessWithContextHelper, pid int32) (UsageReader, error) {
if fn == nil {
return process.NewProcessWithContext(ctx, pid)
}
return fn(ctx, pid)
}
func collectControlProcesses(ctx context.Context, p Parameters) []*ProcessInfo {
var processInfos []*ProcessInfo
cmd := "ps"
args := "-e -o comm,pid"
result := p.executor(ctx, commandlineexecutor.Params{
Executable: cmd,
ArgsToSplit: args,
})
if result.Error != nil {
log.CtxLogger(ctx).Debugw("Error while executing command", "command", cmd, "args", args, "error", result.Error)
return nil
}
process := `\nsapstart.*\n`
processNameWithPIDRegex := regexp.MustCompile(process)
res := processNameWithPIDRegex.FindAllStringSubmatch(result.StdOut, -1)
for _, p := range res {
// Removing all new line chars from the string:
// `\nhdbindexserver 8921\n` -> `hdbindexserver 8921`.
val := newlineChars.ReplaceAllString(p[0], "")
// Removing all multi space chars from the string:
// `hdbindexserver 8921` --> `hdbindexserver 8921`.
val = multiSpaceChars.ReplaceAllString(val, " ")
pnameAndPid := strings.Split(val, " ")
if len(pnameAndPid) != 2 {
log.CtxLogger(ctx).Debugw("Could not parse output", "command", cmd+args, "regex", process)
continue
}
processInfos = append(processInfos, &ProcessInfo{Name: pnameAndPid[0], PID: pnameAndPid[1]})
}
return processInfos
}
// CollectProcessesForInstance returns the list of
// processes running in an SAPInstance.
func CollectProcessesForInstance(ctx context.Context, p Parameters) []*ProcessInfo {
if p.SAPInstance == nil {
log.CtxLogger(ctx).Debug("Error getting ProcessList in computeresources, no sapInstance set.")
return nil
}
var (
processes map[int]*sapcontrol.ProcessStatus
err error
processInfos []*ProcessInfo
)
sc := &sapcontrol.Properties{Instance: p.SAPInstance}
scc := p.SAPControlClient
processes, err = sc.GetProcessList(ctx, scc)
if err != nil {
log.CtxLogger(ctx).Debug("Error performing GetProcessList web method in computeresources", log.Error(err))
}
wpDetails, err := sc.ABAPGetWPTable(ctx, scc)
if err != nil {
log.CtxLogger(ctx).Debugw("Error getting ABAP processes from ABAPGetWPTable web method", log.Error(err))
} else {
for pid, proc := range wpDetails.ProcessNameToPID {
processInfos = append(processInfos, &ProcessInfo{Name: proc, PID: pid})
}
}
for _, process := range processes {
processInfos = append(processInfos, &ProcessInfo{Name: process.Name, PID: process.PID})
}
return processInfos
}
// collectTimeSeriesMetrics collects one time series data point
// per process of the given metric type (CPU, Memory or disk IOPS).
func collectTimeSeriesMetrics(ctx context.Context, p Parameters, processes []*ProcessInfo, metricType int) ([]*mrpb.TimeSeries, error) {
var metrics []*mrpb.TimeSeries
var metricsCollectionErr error
switch metricType {
case collectCPUMetric:
// Collect CPU Usage per process.
cpuUsages, err := CollectCPUPerProcess(ctx, p, processes)
if err != nil {
metricsCollectionErr = err
}
// Build time series metrics.
for _, cpuUsage := range cpuUsages {
// buildMetricLabel can never be nil here
// as cpuUsage from CollectCPUPerProcess is never nil.
metrics = append(metrics, createMetrics(p.cpuMetricPath, buildMetricLabel(collectCPUMetric, "", cpuUsage), cpuUsage.Value, p))
}
case collectMemoryMetric:
// Collect Memory Usage per process.
memoryUsages, err := CollectMemoryPerProcess(ctx, p, processes)
if err != nil {
metricsCollectionErr = err
}
// Build time series metrics.
for _, memoryUsage := range memoryUsages {
// buildMetricLabel can never be nil here
// as memoryUsage from CollectMemoryPerProcess is never nil.
metrics = append(metrics, createMetrics(p.memoryMetricPath, buildMetricLabel(collectMemoryMetric, "VmSize", memoryUsage.VMS), memoryUsage.VMS.Value, p))
metrics = append(metrics, createMetrics(p.memoryMetricPath, buildMetricLabel(collectMemoryMetric, "VmRSS", memoryUsage.RSS), memoryUsage.RSS.Value, p))
metrics = append(metrics, createMetrics(p.memoryMetricPath, buildMetricLabel(collectMemoryMetric, "VmSwap", memoryUsage.Swap), memoryUsage.Swap.Value, p))
}
case collectDiskIOPSMetric:
// Collect IOPS Usage per process.
diskUsages, err := CollectIOPSPerProcess(ctx, p, processes)
if err != nil {
metricsCollectionErr = err
}
// Build time series metrics.
for _, diskUsage := range diskUsages {
// buildMetricLabel can never be nil here
// as diskUsage from CollectIOPSPerProcess is never nil.
metrics = append(metrics, createMetrics(p.iopsReadsMetricPath, buildMetricLabel(collectDiskIOPSMetric, "", diskUsage.DeltaReads), diskUsage.DeltaReads.Value, p))
metrics = append(metrics, createMetrics(p.iopsWritesMetricPath, buildMetricLabel(collectDiskIOPSMetric, "", diskUsage.DeltaWrites), diskUsage.DeltaWrites.Value, p))
}
default:
metricsCollectionErr = fmt.Errorf("Invalid metric type: %v", metricType)
}
return metrics, metricsCollectionErr
}
// buildMetricLabel builds the metric label for a given metric type.
func buildMetricLabel(metricType int, metricLabel string, metric *Metric) map[string]string {
if metric == nil {
return nil
}
processLabel := FormatProcessLabel(metric.ProcessInfo.Name, metric.ProcessInfo.PID)
labels := map[string]string{
"process": processLabel,
}
if metricType == collectMemoryMetric {
// metricLabel is the memory type: VmSize, VmRSS, VmSwap.
labels["memType"] = metricLabel
}
return labels
}
// CollectCPUPerProcess collects CPU utilization per process for HANA, Netweaver and SAP control processes.
func CollectCPUPerProcess(ctx context.Context, p Parameters, processes []*ProcessInfo) ([]*Metric, error) {
var cpuUsages []*Metric
var metricsCollectionErr error
for _, processInfo := range processes {
pid, err := strconv.Atoi(processInfo.PID)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not parse PID", "pid", processInfo.PID, "process", processInfo.Name, "error", err)
continue
}
proc, err := newProc(ctx, p.NewProc, int32(pid))
if err != nil {
log.CtxLogger(ctx).Debugw("Could not create process", "pid", pid, "process", processInfo.Name, "error", err)
metricsCollectionErr = err
continue
}
cpuUsage, err := proc.CPUPercentWithContext(ctx)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not get process CPU stats", "pid", pid, "error", err)
metricsCollectionErr = err
continue
}
cpuUsages = append(cpuUsages, &Metric{
ProcessInfo: processInfo,
Value: cpuUsage / float64(runtime.NumCPU()),
TimeStamp: tspb.Now(),
})
}
return cpuUsages, metricsCollectionErr
}
// CollectMemoryPerProcess collects memory utilization per process
// in megabytes for HANA, Netweaver and SAP control processes.
func CollectMemoryPerProcess(ctx context.Context, p Parameters, processes []*ProcessInfo) ([]*MemoryUsage, error) {
var memoryUsages []*MemoryUsage
var metricsCollectionErr error
for _, processInfo := range processes {
pid, err := strconv.Atoi(processInfo.PID)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not parse PID", "pid", processInfo.PID, "process", processInfo.Name, "error", err)
continue
}
proc, err := newProc(ctx, p.NewProc, int32(pid))
if err != nil {
log.CtxLogger(ctx).Debugw("Could not create process", "pid", pid, "process", processInfo.Name, "error", err)
metricsCollectionErr = err
continue
}
memoryUsage, err := proc.MemoryInfoWithContext(ctx)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not get process memory stats", "pid", pid, "error", err)
metricsCollectionErr = err
continue
}
memoryUsages = append(memoryUsages, &MemoryUsage{
VMS: &Metric{ProcessInfo: processInfo, Value: float64(memoryUsage.VMS) / million, TimeStamp: tspb.Now()},
RSS: &Metric{ProcessInfo: processInfo, Value: float64(memoryUsage.RSS) / million, TimeStamp: tspb.Now()},
Swap: &Metric{ProcessInfo: processInfo, Value: float64(memoryUsage.Swap) / million, TimeStamp: tspb.Now()},
})
}
return memoryUsages, metricsCollectionErr
}
// CollectIOPSPerProcess is responsible for collecting IOPS per process using gopsutil IOCounters data
// and computing the delta between current value and last value of bytes read and written.
func CollectIOPSPerProcess(ctx context.Context, p Parameters, processes []*ProcessInfo) ([]*DiskUsage, error) {
var diskUsages []*DiskUsage
var metricsCollectionErr error
for _, processInfo := range processes {
pid, err := strconv.Atoi(processInfo.PID)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not parse PID", "pid", processInfo.PID, "process", processInfo.Name, "error", err)
continue
}
proc, err := newProc(ctx, p.NewProc, int32(pid))
if err != nil {
log.CtxLogger(ctx).Debugw("Could not create process", "pid", pid, "process", processInfo.Name, "error", err)
metricsCollectionErr = err
continue
}
currVal, err := proc.IOCountersWithContext(ctx)
if err != nil {
log.CtxLogger(ctx).Debugw("Could not get process IOPS stats", "pid", pid, "process", processInfo.Name, "error", err)
metricsCollectionErr = err
continue
}
key := fmt.Sprintf("%s:%s", processInfo.Name, processInfo.PID)
if _, ok := p.LastValue[key]; !ok {
log.CtxLogger(ctx).Debugw("not creating metric since last value is not updated for IOPS stats", "pid", pid)
p.LastValue[key] = currVal
continue
}
deltaReads := float64(currVal.ReadBytes-p.LastValue[key].ReadBytes) / thousand
deltaWrites := float64(currVal.WriteBytes-p.LastValue[key].WriteBytes) / thousand
p.LastValue[key] = currVal
freq := p.Config.GetCollectionConfiguration().GetProcessMetricsFrequency()
diskUsages = append(diskUsages, &DiskUsage{
DeltaReads: &Metric{ProcessInfo: processInfo, Value: deltaReads / float64(freq), TimeStamp: tspb.Now()},
DeltaWrites: &Metric{ProcessInfo: processInfo, Value: deltaWrites / float64(freq), TimeStamp: tspb.Now()},
})
}
return diskUsages, metricsCollectionErr
}
func createMetrics(mPath string, labels map[string]string, val float64, p Parameters) *mrpb.TimeSeries {
if p.SAPInstance != nil {
labels["sid"] = p.SAPInstance.GetSapsid()
labels["instance_nr"] = p.SAPInstance.GetInstanceNumber()
}
ts := timeseries.Params{
CloudProp: protostruct.ConvertCloudPropertiesToStruct(p.Config.CloudProperties),
MetricType: metricURL + mPath,
MetricLabels: labels,
Timestamp: tspb.Now(),
Float64Value: val,
BareMetal: p.Config.BareMetal,
}
log.Logger.Debugw("Creating metric for instance", "metric", mPath, "value", val, "instancenumber", p.SAPInstance.GetInstanceNumber(), "labels", labels)
return timeseries.BuildFloat64(ts)
}
// FormatProcessLabel creates a unique label for a process.
func FormatProcessLabel(pname, pid string) string {
result := forwardSlashChar.ReplaceAllString(pname, "_")
result = dashChars.ReplaceAllString(result, "_")
return result + ":" + pid
}