pkg/profiling/continuous/checker/common/system_checker.go (103 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package common
import (
"fmt"
"strings"
"time"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
)
type SystemBasedChecker[V numbers] struct {
CheckType base.CheckType
MonitorType v3.ContinuousProfilingTriggeredMonitorType
ThresholdGenerate func(val string) (V, error)
DataGenerate func() (V, error)
GlobalWindows *base.TimeWindows[V, V]
Policies []*SystemBasedPolicy[V]
}
func NewSystemBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
dataGenerator func() (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *SystemBasedChecker[V] {
checker := &SystemBasedChecker[V]{
CheckType: checkType,
MonitorType: monitorType,
ThresholdGenerate: thresholdGenerator,
DataGenerate: dataGenerator,
GlobalWindows: base.NewTimeWindows[V, V](nil, func() base.WindowData[V, V] {
return base.NewLatestWindowData[V]()
}),
}
return checker
}
func (s *SystemBasedChecker[V]) SyncPolicies(policies []*base.SyncPolicyWithProcesses) {
result := make([]*SystemBasedPolicy[V], 0)
items := make([]*base.PolicyItem, 0)
for _, policyWithProcesses := range policies {
item := policyWithProcesses.Policy.Items[s.CheckType]
if item == nil {
continue
}
threshold, err := s.ThresholdGenerate(item.Threshold)
if err != nil {
log.Warnf("failure to parse the %s threshold: %v, error: %v", s.CheckType, item.Threshold, err)
continue
}
items = append(items, item)
processes := make([]api.ProcessInterface, 0)
for _, p := range policyWithProcesses.Processes {
processes = append(processes, p)
}
result = append(result, &SystemBasedPolicy[V]{
Threshold: threshold,
Policy: item,
Processes: processes,
})
}
s.Policies = result
s.GlobalWindows.ScalePeriod(items)
}
func (s *SystemBasedChecker[V]) Fetch() error {
if len(s.Policies) == 0 {
return nil
}
val, err := s.DataGenerate()
if err != nil {
return fmt.Errorf("get the system %s error: %v", s.CheckType, err)
}
s.GlobalWindows.Add(time.Now(), val)
return nil
}
func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause {
if len(s.Policies) == 0 {
return nil
}
causes := make([]base.ThresholdCause, 0)
data, hasData := s.GlobalWindows.FlushMostRecentData()
for _, policy := range s.Policies {
if hasData {
for _, p := range policy.Processes {
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(s.CheckType)), p, nil, float64(data))
}
}
lastMatch, isMatch := s.GlobalWindows.MatchRule(policy.Policy, func(val V) bool {
return val >= policy.Threshold
})
if !isMatch {
continue
}
for _, p := range policy.Processes {
if !ctx.ShouldCheck(p, policy.Policy) {
continue
}
causes = append(causes, NewSingleValueCause(p, policy.Policy, s.MonitorType, float64(policy.Threshold), float64(lastMatch)))
}
}
return causes
}
func (s *SystemBasedChecker[V]) Close() error {
return nil
}
type SystemBasedPolicy[V numbers] struct {
Threshold V
Policy *base.PolicyItem
Processes []api.ProcessInterface
}