otelcollector/fluent-bit/src/process_stats.go (178 lines of code) (raw):
package main
import (
"fmt"
"maps"
"math"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/microsoft/ApplicationInsights-Go/appinsights"
stats "github.com/shirou/gopsutil/v4/process"
)
var replicasetDimensionsNameToEnvVar = map[string]string{
"cpulimit": "CONTAINER_CPU_LIMIT",
"memlimit": "CONTAINER_MEMORY_LIMIT",
"defaultscrapekubelet": "AZMON_PROMETHEUS_KUBELET_SCRAPING_ENABLED",
"defaultscrapecoreDns": "AZMON_PROMETHEUS_COREDNS_SCRAPING_ENABLED",
"defaultscrapecadvisor": "AZMON_PROMETHEUS_CADVISOR_SCRAPING_ENABLED",
"defaultscrapekubeproxy": "AZMON_PROMETHEUS_KUBEPROXY_SCRAPING_ENABLED",
"defaultscrapeapiserver": "AZMON_PROMETHEUS_APISERVER_SCRAPING_ENABLED",
"defaultscrapekubestate": "AZMON_PROMETHEUS_KUBESTATE_SCRAPING_ENABLED",
"defaultscrapenodeexporter": "AZMON_PROMETHEUS_NODEEXPORTER_SCRAPING_ENABLED",
"defaultscrapecollectorhealth": "AZMON_PROMETHEUS_COLLECTOR_HEALTH_SCRAPING_ENABLED",
"defaultscrapewindowsexporter": "AZMON_PROMETHEUS_WINDOWSEXPORTER_SCRAPING_ENABLED",
"defaultscrapewindowskubeproxy": "AZMON_PROMETHEUS_WINDOWSKUBEPROXY_SCRAPING_ENABLED",
"defaultscrapepodannotations": "AZMON_PROMETHEUS_POD_ANNOTATION_SCRAPING_ENABLED",
"podannotationns": "AZMON_PROMETHEUS_POD_ANNOTATION_NAMESPACES_REGEX",
"defaultscrapekappiebasic": "AZMON_PROMETHEUS_KAPPIEBASIC_SCRAPING_ENABLED",
"defaultscrapenetworkobservabilityRetina": "AZMON_PROMETHEUS_NETWORKOBSERVABILITYRETINA_SCRAPING_ENABLED",
"defaultscrapenetworkobservabilityHubble": "AZMON_PROMETHEUS_NETWORKOBSERVABILITYHUBBLE_SCRAPING_ENABLED",
"defaultscrapenetworkobservabilityCilium": "AZMON_PROMETHEUS_NETWORKOBSERVABILITYCILIUM_SCRAPING_ENABLED",
"nodeexportertargetport": "NODE_EXPORTER_TARGETPORT",
"nodeexportername": "NODE_EXPORTER_NAME",
"kubestatename": "KUBE_STATE_NAME",
"kubestateversion": "KUBE_STATE_VERSION",
"nodeexporterversion": "NODE_EXPORTER_VERSION",
"akvauth": "AKVAUTH",
"debugmodeenabled": "DEBUG_MODE_ENABLED",
"kubestatemetriclabelsallowlist": "KUBE_STATE_METRIC_LABELS_ALLOWLIST",
"kubestatemetricannotationsallowlist": "KUBE_STATE_METRIC_ANNOTATIONS_ALLOWLIST",
"httpproxyenabled": "HTTP_PROXY_ENABLED",
"tadapterh": "tokenadapterHealthyAfterSecs",
"tadapterf": "tokenadapterUnhealthyAfterSecs",
"setGlobalSettings": "AZMON_SET_GLOBAL_SETTINGS",
"globalSettingsConfigured": "AZMON_GLOBAL_SETTINGS_CONFIGURED",
"calias": "AZMON_CLUSTER_ALIAS",
"clabel": "AZMON_CLUSTER_LABEL",
"mip": "MINIMAL_INGESTION_PROFILE",
"operatormodel": "AZMON_OPERATOR_ENABLED",
"operatormodelcfgmapsetting": "AZMON_OPERATOR_ENABLED_CFG_MAP_SETTING",
"operatormodelchartsetting": "AZMON_OPERATOR_ENABLED_CHART_SETTING",
"collectorHpaEnabled": "AZMON_COLLECTOR_HPA_ENABLED",
"isarcextension": "$IS_ARC_EXTENSION",
"arcdistribution": "$ARC_DISTRIBUTION",
"mountmarinercerts": "$MOUNT_MARINER_CERTS",
"mountubuntucerts": "$MOUNT_UBUNTU_CERTS",
}
var daemonsetDimensionsNameToEnvVar = map[string]string{
"cpulimit": "CONTAINER_CPU_LIMIT",
"memlimit": "CONTAINER_MEMORY_LIMIT",
"debugmodeenabled": "DEBUG_MODE_ENABLED",
"tadapterh": "tokenadapterHealthyAfterSecs",
"tadapterf": "tokenadapterUnhealthyAfterSecs",
}
type Process struct {
processName string
processPID int32
cpuValues sort.Float64Slice
memValues sort.Float64Slice
process *stats.Process
telemetryDimensions map[string]string
}
type ProcessAggregations struct {
processMap map[string]*Process
mu sync.Mutex
}
func InitProcessAggregations(processName []string, os string) *ProcessAggregations {
fmt.Println("Starting process aggregations")
processAggregationsMap := make(map[string]*Process)
for _, processName := range processName {
pids, err := findPIDFromExe(processName, os)
if err != nil || len(pids) == 0 {
fmt.Printf("Error getting PID for process %s: %s\n", processName, err.Error())
continue
}
process, err := stats.NewProcess(pids[0])
if err != nil {
fmt.Printf("Error tracking process %s\n", processName)
continue
}
p := Process{
processName: processName,
processPID: pids[0],
process: process,
telemetryDimensions: getExtraDimensions(processName), // Set dimensions from env vars once
}
processAggregationsMap[processName] = &p
}
return &ProcessAggregations{
processMap: processAggregationsMap,
}
}
func (pa *ProcessAggregations) Run() {
go pa.CollectStats()
go pa.SendToAppInsights()
}
func (pa *ProcessAggregations) CollectStats() {
ticker := time.NewTicker(time.Second * time.Duration(5))
for ; true; <-ticker.C {
pa.mu.Lock()
for _, p := range pa.processMap {
// 0 means to use the delta with the previous CPU seconds reading
cpu, err := p.process.Percent(0)
if err == nil {
p.cpuValues = append(p.cpuValues, cpu)
p.cpuValues.Sort()
}
mem, err := p.process.MemoryInfo()
if err == nil {
p.memValues = append(p.memValues, float64(mem.RSS))
p.memValues.Sort()
}
}
Log("Collected process stats")
pa.mu.Unlock()
}
}
func (pa *ProcessAggregations) SendToAppInsights() {
ticker := time.NewTicker(time.Second * time.Duration(300))
for ; true; <-ticker.C {
pa.mu.Lock()
// For each process, send 50th and 95th percentile CPU and Memory usage
for processName, p := range pa.processMap {
for _, percentile := range []int{50, 95} {
if len(p.cpuValues) > 0 {
cpuMetric := createProcessMetric(processName, "cpu_usage", percentile, p.cpuValues)
// Add telemetry dimensions to the metric properties
maps.Copy(cpuMetric.Properties, p.telemetryDimensions)
TelemetryClient.Track(cpuMetric)
}
if len(p.memValues) > 0 {
memMetric := createProcessMetric(processName, "memory_rss", percentile, p.memValues)
TelemetryClient.Track(memMetric)
}
}
Log(fmt.Sprintf("Sent telemetry for process %s", processName))
// Clear values for next aggregation period
p.cpuValues = sort.Float64Slice{}
p.memValues = sort.Float64Slice{}
}
pa.mu.Unlock()
}
}
func getExtraDimensions(processName string) map[string]string {
extraDimensions := make(map[string]string)
if processName == "otelcollector" {
var dimensionNamesToEnvVar map[string]string
controllerType := os.Getenv(envControllerType)
if controllerType == "ReplicaSet" {
dimensionNamesToEnvVar = replicasetDimensionsNameToEnvVar
} else if controllerType == "DaemonSet" {
dimensionNamesToEnvVar = daemonsetDimensionsNameToEnvVar
}
for dimensionName, envVarName := range dimensionNamesToEnvVar {
envVarValue := os.Getenv(envVarName)
if envVarValue != "" {
extraDimensions[dimensionName] = envVarValue
}
}
}
return extraDimensions
}
func createProcessMetric(processName string, metricName string, percentile int, values sort.Float64Slice) *appinsights.MetricTelemetry {
if len(values) == 0 {
return nil
}
return appinsights.NewMetricTelemetry(
fmt.Sprintf("%s_%s_0%d", strings.ToLower(processName), metricName, percentile),
float64(values[int(math.Round(float64(len(values)-1)*float64(percentile)/100.0))]),
)
}