internal/processmetrics/networkstats/networkstats.go (235 lines of code) (raw):
/*
Copyright 2023 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package networkstats is responsible for collection of TCP network stats
// under /sap/networkstats/
package networkstats
import (
"context"
"fmt"
"path"
"regexp"
"strconv"
"strings"
"github.com/cenkalti/backoff/v4"
"github.com/GoogleCloudPlatform/sapagent/internal/utils/protostruct"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/cloudmonitoring"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/commandlineexecutor"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/timeseries"
mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
tspb "google.golang.org/protobuf/types/known/timestamppb"
cnfpb "github.com/GoogleCloudPlatform/sapagent/protos/configuration"
)
// Properties struct contains the parameters necessary for networkstats package common methods.
type (
Properties struct {
Executor commandlineexecutor.Execute
Config *cnfpb.Configuration
Client cloudmonitoring.TimeSeriesCreator
PMBackoffPolicy backoff.BackOffContext
SkippedMetrics map[string]bool
}
metricVal struct {
val any
Type string
}
)
var (
metricRe = regexp.MustCompile(`(\w*:[a-zA-Z\d,\/.]*)|(\w+\s\d+[a-zA-Z.\d]*)`)
requiredFloatMetrics = []string{"rtt", "rcv_rtt"}
requiredIntMetrics = []string{"rto", "bytes_acked", "bytes_received", "lastsnd", "lastrcv"}
)
const (
metricURL = "workload.googleapis.com"
nwStatsPath = "/sap/networkstats"
)
/*
Collect is an implementation of Collector interface defined in processmetrics.go.
Collect method collects network metrics, logs errors if it encounters
any and returns the collected metrics with the last error encountered while collecting metrics.
*/
func (p *Properties) Collect(ctx context.Context) ([]*mrpb.TimeSeries, error) {
var floatMetrics, intMetrics []string
for _, metric := range requiredFloatMetrics {
if p.SkippedMetrics[path.Join(nwStatsPath, metric)] {
log.CtxLogger(ctx).Debug("Skipping collection of networkstats metric:", metric)
continue
}
floatMetrics = append(floatMetrics, metric)
}
for _, metric := range requiredIntMetrics {
if p.SkippedMetrics[path.Join(nwStatsPath, metric)] {
log.CtxLogger(ctx).Debug("Skipping collection of networkstats metric:", metric)
continue
}
intMetrics = append(intMetrics, metric)
}
if len(floatMetrics) == 0 && len(intMetrics) == 0 {
log.CtxLogger(ctx).Debug("Skipping collection of all networkstats metrics")
return nil, nil
}
log.CtxLogger(ctx).Debug("Collecting networkstats metrics")
pid := p.fetchPID(ctx)
if pid == "" {
return nil, fmt.Errorf("could not fetch pid of hdbnameserver")
}
socket, err := p.fetchHDBSocket(ctx)
if err != nil {
return nil, err
}
socket = strings.ReplaceAll(socket, "0.0.0.0", "*")
out := p.fetchSSOutput(ctx, socket)
log.CtxLogger(ctx).Debug("Socket: ", socket, "Output: ", out, "pid: ", pid)
metricList := parseSSOutput(ctx, out)
if len(metricList) == 0 {
return nil, fmt.Errorf("could not fetch TCP connection Metrics")
}
ssValues := mapValues(metricList)
floats, err := p.createTSList(ctx, pid, floatMetrics, ssValues, "float64")
if err != nil {
return nil, err
}
ints, err := p.createTSList(ctx, pid, intMetrics, ssValues, "int64")
if err != nil {
return nil, err
}
return append(floats, ints...), nil
}
// fetchPID fetches the pid of hdbnameserver.
func (p *Properties) fetchPID(ctx context.Context) string {
result := p.Executor(ctx, commandlineexecutor.Params{
Executable: "pidof",
ArgsToSplit: "hdbnameserver",
})
return result.StdOut
}
// fetchHDBSocket fetches the listening socket opened by hdbnameserver.
func (p *Properties) fetchHDBSocket(ctx context.Context) (string, error) {
// This is a single line command to fetch LISTENING ports used by hdbnameserver
// using lsof command
// Note: Backticks in grep -Eo `<regularExp>` get replaced by single quotes
// as explained in commandlineexecutor.go
argsToSplit := "-c 'sudo lsof -nP -p $(pidof hdbnameserver) | grep LISTEN | grep -v 127.0.0.1 | grep -Eo `(([0-9]{1,3}\\.){1,3}[0-9]{1,3})|(\\*)\\:[0-9]{3,5}`'"
result := p.Executor(ctx, commandlineexecutor.Params{
Executable: "bash",
ArgsToSplit: argsToSplit,
})
if !strings.Contains(result.StdErr, "command not found") {
log.CtxLogger(ctx).Debug("Fetched listening sockets for hdbnameserver using lsof command")
return strings.Split(result.StdOut, "\n")[0], nil
}
// This is a single line command to fetch LISTENING ports used by hdbnameserver
// using ss command
argsToSplit = "-c 'sudo ss -lp | grep $(pidof hdbnameserver) | grep -v 127.0.0.1 | grep -Eo `((([0-9]{1,3}\\.){1,3}[0-9]{1,3})|(\\*))\\:[0-9]{3,5}`'"
result = p.Executor(ctx, commandlineexecutor.Params{
Executable: "bash",
ArgsToSplit: argsToSplit,
})
if !strings.Contains(result.StdErr, "command not found") {
socket := strings.Split(result.StdOut, "\n")[0]
// Replacing '0.0.0.0' with wildcard '*' as '*' listens to all available
// interfaces on both ipv6 and ipv4 addresses, whereas 0.0.0.0 listens to
// all interfaces on ipv4 addresses.
// 'lsof' command is more generic than 'ss'. Generally, 0.0.0.0 and * are equivalent,
// leading to higher chances of TCP metrics being fetched.
socket = strings.ReplaceAll(socket, "0.0.0.0", "*")
log.CtxLogger(ctx).Debug("Fetched listening sockets for hdbnameserver using ss command")
return socket, nil
}
log.CtxLogger(ctx).Debugw("Could not fetch listening sockets for hdbnameserver", "error", result.StdErr)
return "", result.Error
}
// fetchSSOutput fetches the output of ss command for the given socket.
func (p *Properties) fetchSSOutput(ctx context.Context, socket string) string {
result := p.Executor(ctx, commandlineexecutor.Params{
Executable: "bash",
ArgsToSplit: "-c 'echo ss -tin src " + socket + " | sh'",
})
return result.StdOut
}
// mapValues creates a map of values from given metric list.
// Sample input:
// metricList: ["wscale:7,7", "rto:204", "rtt:0.017/0.008", "send 154202352941bps", "lastsnd:28", "lastrcv:28", "lastack:28", "pacing_rate 306153576640bps"]
// Sample output:
// ssMap: map["wscale": "7", "rto": "204", "rtt": "0.017/0.008", "send": "154202352941bps", "lastsnd": "28", "lastrcv": "28", "lastack": "28", "pacing_rate": "306153576640bps"]
func mapValues(metrics []string) map[string]string {
ssValues := make(map[string]string)
for _, metric := range metrics {
k, v, ok := strings.Cut(metric, ":")
if !ok {
k, v, ok = strings.Cut(metric, " ")
if !ok {
log.Logger.Debugw("Could not find a whitespace or colon separator in ss metrics received for metric:", metric)
continue
}
}
if len(v) == 0 {
log.Logger.Debugw("Empty value for metric:", metric)
continue
}
ssValues[k] = v
}
return ssValues
}
// createTSList creates a slice of timeseries metrics according to the required metric values
// It returns this slice along with an error which could possibly be non-nil.
// Some error could occur in collection of one individual metric.
func (p *Properties) createTSList(ctx context.Context, pid string, reqMetrics []string, ssMap map[string]string, t string) ([]*mrpb.TimeSeries, error) {
var metrics []*mrpb.TimeSeries
for _, metric := range reqMetrics {
if _, ok := ssMap[metric]; !ok {
log.CtxLogger(ctx).Debug("Metric skipped, could not find metric: ", metric)
continue
}
var val any
var err error
if t == "float64" {
if metric == "rtt" {
val, err = strconv.ParseFloat(strings.Split(ssMap[metric], "/")[0], 64)
} else {
val, err = strconv.ParseFloat(ssMap[metric], 64)
}
} else {
val, err = strconv.ParseInt(ssMap[metric], 10, 64)
}
if err != nil {
log.CtxLogger(ctx).Debugw("error in parsing value", "could not convert value to type:", t, "metric:", metric, "Val: ", ssMap[metric], "err: ", err)
return nil, err
}
ssMetrics := p.collectTCPMetrics(ctx, metric, pid, metricVal{val, t})
if ssMetrics != nil {
metrics = append(metrics, ssMetrics...)
}
}
return metrics, nil
}
// CollectWithRetry decorates the Collect method with retry mechanism.
func (p *Properties) CollectWithRetry(ctx context.Context) ([]*mrpb.TimeSeries, error) {
attempt := 1
var res []*mrpb.TimeSeries
err := backoff.Retry(func() error {
select {
case <-ctx.Done():
log.CtxLogger(ctx).Debugw("Context cancelled, exiting CollectWithRetry")
return nil
default:
var err error
res, err = p.Collect(ctx)
if err != nil {
log.CtxLogger(ctx).Debugw("Error in Collection", "attempt", attempt, "error", err)
attempt++
}
return err
}
}, p.PMBackoffPolicy)
if err != nil {
log.CtxLogger(ctx).Infow("Retry limit exceeded", "error", err)
}
return res, err
}
// collectTCPMetrics collects TCP connection metrics.
func (p *Properties) collectTCPMetrics(ctx context.Context, metric, pid string, data metricVal) []*mrpb.TimeSeries {
labels := map[string]string{
"name": metric,
"process": "hdbnameserver",
"pid": pid,
}
return []*mrpb.TimeSeries{p.createMetric(labels, data)}
}
// createMetric creates a TimeSeries metric with given labels and values.
func (p *Properties) createMetric(labels map[string]string, data metricVal) *mrpb.TimeSeries {
metricPath := path.Join(nwStatsPath, labels["name"])
log.Logger.Debugw("Creating metric for instance", "metric", metricPath, "value", data.val, "labels", labels)
ts := timeseries.Params{
CloudProp: protostruct.ConvertCloudPropertiesToStruct(p.Config.CloudProperties),
MetricType: metricURL + metricPath,
MetricLabels: labels,
Timestamp: tspb.Now(),
BareMetal: p.Config.BareMetal,
}
switch data.Type {
case "float64":
ts.Float64Value = data.val.(float64)
return timeseries.BuildFloat64(ts)
default:
ts.Int64Value = data.val.(int64)
return timeseries.BuildInt(ts)
}
}
// parseSSOutput parses given SSOutput for a list of TCP connection metrics.
// Example ssOutput:
// 20210
// State Recv-Q Send-Q Local Address:Port Peer Address:Port
// ESTAB 0 0 127.0.0.1:30013 127.0.0.1:55494
// \t cubic wscale:7,7 rto:204 rtt:0.017/0.008 send 154202352941bps lastsnd:28 lastrcv:28 lastack:28 pacing_rate 306153576640bps delivered:3 app_limited rcv_space:65483 minrtt:0.015
// \n
//
// This function returns PID and a string slice containing these metrics.
// PID: 20210
// metricList: ["wscale:7,7", "rto:204", "rtt:0.017/0.008", "send 154202352941bps", "lastsnd:28", "lastrcv:28", "lastack:28", "pacing_rate 306153576640bps", "delivered:3", "rcv_space:65483", "minrtt:0.015"]
func parseSSOutput(ctx context.Context, ssOutput string) []string {
var metrics []string
out := strings.Split(ssOutput, "\n")
// Checking if TCP connection metrics has been found or not
if len(out) <= 3 {
log.CtxLogger(ctx).Debug("TCP connection metrics not received")
return metrics
}
metricString := out[2]
// TCP SS metrics received using ss command presents the metrics in two ways:
// rtt:0.027/0.23, OR
// pacing_rate 306153576640bps
// metricRe matches for substrings following either of these display patterns
// A string slice of these metrics gets generated: ["rtt:0.026,0.23", "pacing_rate 306153576640bps"]
metrics = metricRe.FindAllString(metricString, -1)
log.CtxLogger(ctx).Debug("Metric List: ", metrics)
return metrics
}