pkg/profiling/continuous/checker/common/http_checker.go (199 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package common
import (
"regexp"
"strings"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
"github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/bpf/network"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
)
type HTTPBasedChecker struct {
*BaseChecker[*HTTPBasedCheckerProcessInfo]
CheckType base.CheckType
MonitorType v3.ContinuousProfilingTriggeredMonitorType
ThresholdGenerate func(val string) (float64, error)
}
func NewHTTPBasedChecker(checkType base.CheckType,
thresholdGenerator func(val string) (float64, error), dataGenerator func() base.WindowData[network.BufferEvent, float64],
monitorType v3.ContinuousProfilingTriggeredMonitorType) *HTTPBasedChecker {
checker := &HTTPBasedChecker{
CheckType: checkType,
ThresholdGenerate: thresholdGenerator,
MonitorType: monitorType,
}
checker.BaseChecker = NewBaseChecker[*HTTPBasedCheckerProcessInfo](
func(p api.ProcessInterface, older *HTTPBasedCheckerProcessInfo, items []*base.PolicyItem) *HTTPBasedCheckerProcessInfo {
result := &HTTPBasedCheckerProcessInfo{
Process: p,
PolicyWithWindows: make(map[*base.PolicyItem]*HTTPBasedCheckerPolicyItemWindows),
}
for _, item := range items {
val, _ := thresholdGenerator(item.Threshold)
policyInfo := &HTTPBasedCheckerPolicyItemWindows{
threshold: val,
}
timeWindowsUpdated := false
if older != nil {
for olderItem, olderInfo := range older.PolicyWithWindows {
// reading from the older policy info
if olderItem.SameURIFilter(item) {
if len(item.URIList) > 0 {
for _, w := range olderInfo.uriWithTimeWindows {
w.ScalePeriod([]*base.PolicyItem{item})
}
policyInfo.uriWithTimeWindows = olderInfo.uriWithTimeWindows
} else {
policyInfo.defaultTimeWindows = olderInfo.defaultTimeWindows
policyInfo.defaultTimeWindows.ScalePeriod([]*base.PolicyItem{item})
}
timeWindowsUpdated = true
break
}
}
}
if timeWindowsUpdated {
result.PolicyWithWindows[item] = policyInfo
continue
}
// otherwise, create the time windows
if len(item.URIList) > 0 {
uriWithWindows := make(map[string]*base.TimeWindows[network.BufferEvent, float64])
for _, uri := range item.URIList {
uriWithWindows[uri] = base.NewTimeWindows[network.BufferEvent, float64](
[]*base.PolicyItem{item}, func() base.WindowData[network.BufferEvent, float64] {
return dataGenerator()
})
}
policyInfo.uriWithTimeWindows = uriWithWindows
} else if item.URIRegex != "" {
regex, err := regexp.Compile(item.URIRegex)
if err != nil {
log.Warnf("error to compile the URI regex for policy, ignore this policy. regex: %s", item.URIRegex)
continue
}
policyInfo.uriRegex = regex
}
policyInfo.defaultTimeWindows = base.NewTimeWindows[network.BufferEvent, float64](
[]*base.PolicyItem{item}, func() base.WindowData[network.BufferEvent, float64] {
return dataGenerator()
})
result.PolicyWithWindows[item] = policyInfo
}
return result
})
network.AddEventNotify(checker)
return checker
}
func (n *HTTPBasedChecker) SyncPolicies(policies []*base.SyncPolicyWithProcesses) {
n.BaseChecker.SyncPolicies(policies, func(items map[base.CheckType]*base.PolicyItem) *base.PolicyItem {
item := items[n.CheckType]
if item == nil {
return nil
}
_, err := n.ThresholdGenerate(item.Threshold)
if err != nil {
log.Warnf("failure to parse the %s threshold to int: %v", n.CheckType, item.Threshold)
return nil
}
return item
}, func(pid int32, isDelete bool) {
// notify to the listener
var err error
defer func() {
if err != nil {
log.Warnf("process the pid monitoring failure, pid: %d, is delete: %t, erro: %v", pid, isDelete, err)
}
}()
if isDelete {
err = network.RemoveWatchProcess(pid, string(n.CheckType))
return
}
err = network.AddWatchProcess(pid, string(n.CheckType))
})
}
func (n *HTTPBasedChecker) ReceiveBufferEvent(event network.BufferEvent) {
info := n.PidWithInfos[event.Pid()]
if info == nil {
return
}
for _, policyInfo := range info.PolicyWithWindows {
var matchesWindows *base.TimeWindows[network.BufferEvent, float64]
// match with the regex or URI list
if len(policyInfo.uriWithTimeWindows) > 0 {
for uri, windows := range policyInfo.uriWithTimeWindows {
if event.RequestURI() == uri {
matchesWindows = windows
}
}
if matchesWindows == nil {
continue
}
} else if policyInfo.uriRegex != nil && !policyInfo.uriRegex.MatchString(event.RequestURI()) {
continue
}
if matchesWindows == nil {
matchesWindows = policyInfo.defaultTimeWindows
}
matchesWindows.Add(event.StartTime(), event)
}
}
func (n *HTTPBasedChecker) Fetch() error {
return nil
}
func (n *HTTPBasedChecker) Close() error {
return network.ForceShutdownBPF()
}
func (n *HTTPBasedChecker) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause {
causes := make([]base.ThresholdCause, 0)
for _, pidPolicies := range n.PidWithInfos {
for item, itemInfo := range pidPolicies.PolicyWithWindows {
globalURI := ""
if itemInfo.uriRegex != nil {
globalURI = itemInfo.uriRegex.String()
}
for uri, windows := range itemInfo.uriWithTimeWindows {
n.flushMetrics(uri, windows, pidPolicies.Process, metricsAppender)
}
if itemInfo.defaultTimeWindows != nil {
n.flushMetrics(globalURI, itemInfo.defaultTimeWindows, pidPolicies.Process, metricsAppender)
}
if !ctx.ShouldCheck(pidPolicies.Process, item) {
continue
}
// url list checker
for uri, window := range itemInfo.uriWithTimeWindows {
if lastMatch, isMatch := window.MatchRule(item, func(val float64) bool {
return val >= itemInfo.threshold
}); isMatch {
causes = append(causes, NewURICause(pidPolicies.Process, false, uri, item,
n.MonitorType, itemInfo.threshold, lastMatch))
}
}
// regex or global
if lastMatch, isMatch := itemInfo.defaultTimeWindows.MatchRule(item, func(val float64) bool {
return val >= itemInfo.threshold
}); isMatch {
causes = append(causes, NewURICause(pidPolicies.Process, itemInfo.uriRegex != nil, globalURI, item,
n.MonitorType, itemInfo.threshold, lastMatch))
}
}
}
return causes
}
func (n *HTTPBasedChecker) flushMetrics(uri string, windows *base.TimeWindows[network.BufferEvent, float64],
process api.ProcessInterface, metricsAppender *base.MetricsAppender) {
if uri == "" {
uri = "global"
}
if data, hasUpdate := windows.FlushMultipleRecentData(); hasUpdate {
// flush each slot data
for _, d := range data {
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(n.CheckType)), process, map[string]string{
"uri": uri,
}, d)
}
}
}
type HTTPBasedCheckerProcessInfo struct {
Process api.ProcessInterface
PolicyWithWindows map[*base.PolicyItem]*HTTPBasedCheckerPolicyItemWindows
}
type HTTPBasedCheckerPolicyItemWindows struct {
uriWithTimeWindows map[string]*base.TimeWindows[network.BufferEvent, float64]
uriRegex *regexp.Regexp
defaultTimeWindows *base.TimeWindows[network.BufferEvent, float64]
threshold float64
}