pkg/profiling/continuous/checker/common/process_checker.go (118 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package common import ( "fmt" "strings" "time" "github.com/hashicorp/go-multierror" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/profiling/continuous/base" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3" ) type ProcessBasedChecker[V numbers] struct { *BaseChecker[*ProcessBasedInfo[V]] CheckType base.CheckType MonitorType v3.ContinuousProfilingTriggeredMonitorType ThresholdGenerate func(val string) (V, error) DataGenerate func(process api.ProcessInterface) (V, error) } func NewProcessBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error), dataGenerator func(p api.ProcessInterface) (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *ProcessBasedChecker[V] { checker := &ProcessBasedChecker[V]{ CheckType: checkType, MonitorType: monitorType, ThresholdGenerate: thresholdGenerator, DataGenerate: dataGenerator, } checker.BaseChecker = NewBaseChecker[*ProcessBasedInfo[V]]( func(p api.ProcessInterface, older *ProcessBasedInfo[V], items []*base.PolicyItem) *ProcessBasedInfo[V] { var win *base.TimeWindows[V, V] if older != nil { win = older.Windows older.Windows.ScalePeriod(items) } else { win = base.NewTimeWindows[V, V](items, func() base.WindowData[V, V] { return base.NewLatestWindowData[V]() }) } policies := make([]*ProcessBasedPolicy[V], 0) for _, i := range items { threshold, _ := thresholdGenerator(i.Threshold) policies = append(policies, &ProcessBasedPolicy[V]{ Threshold: threshold, Policy: i, }) } return &ProcessBasedInfo[V]{ Process: p, Windows: win, Policies: policies, } }) return checker } func (r *ProcessBasedChecker[V]) SyncPolicies(policies []*base.SyncPolicyWithProcesses) { r.BaseChecker.SyncPolicies(policies, func(items map[base.CheckType]*base.PolicyItem) *base.PolicyItem { item := items[r.CheckType] if item == nil { return nil } _, err := r.ThresholdGenerate(item.Threshold) if err != nil { log.Warnf("failure to parse the %s threshold: %v, error: %v", r.CheckType, item.Threshold, err) return nil } return item }, nil) } func (r *ProcessBasedChecker[V]) Fetch() error { if len(r.PidWithInfos) == 0 { return nil } var result error now := time.Now() for _, info := range r.PidWithInfos { val, err := r.DataGenerate(info.Process) if err != nil { result = multierror.Append(result, fmt.Errorf("get the process %s failure, pid: %d, error: %v", r.CheckType, info.Process.Pid(), err)) continue } info.Windows.Add(now, val) } return result } func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause { if len(r.PidWithInfos) == 0 { return nil } causes := make([]base.ThresholdCause, 0) for _, info := range r.PidWithInfos { for _, threshold := range info.Policies { if data, hasData := info.Windows.FlushMostRecentData(); hasData { metricsAppender.AppendProcessSingleValue(strings.ToLower(string(r.CheckType)), info.Process, nil, float64(data)) } if !ctx.ShouldCheck(info.Process, threshold.Policy) { continue } // check is reach the threshold if lastMatch, enable := info.Windows.MatchRule(threshold.Policy, func(val V) bool { return val >= threshold.Threshold }); enable { causes = append(causes, NewSingleValueCause(info.Process, threshold.Policy, r.MonitorType, float64(threshold.Threshold), float64(lastMatch))) } } } return causes } func (r *ProcessBasedChecker[V]) Close() error { return nil } type ProcessBasedInfo[V numbers] struct { Process api.ProcessInterface Windows *base.TimeWindows[V, V] Policies []*ProcessBasedPolicy[V] } type ProcessBasedPolicy[V numbers] struct { Threshold V Policy *base.PolicyItem }