pkg/profiling/continuous/trigger/common.go (121 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package trigger
import (
"fmt"
"time"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
"github.com/apache/skywalking-rover/pkg/profiling/task"
taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
)
var log = logger.GetLogger("profiling", "continuous", "trigger")
type BaseTrigger struct {
profilingCache map[string]*task.Context
executeTime time.Duration
silenceTime time.Duration
profilingTaskDimension func(p api.ProcessInterface) string
mainProcessSelector func(ps []api.ProcessInterface) api.ProcessInterface
taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause)
reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)
}
func NewSingleProcessBaseTrigger(conf *base.ContinuousConfig,
taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause),
reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)) *BaseTrigger {
trigger := &BaseTrigger{
profilingTaskDimension: func(p api.ProcessInterface) string {
return p.ID()
},
mainProcessSelector: func(ps []api.ProcessInterface) api.ProcessInterface {
return ps[0]
},
taskSetter: taskSetter,
reportSetter: reportSetter,
profilingCache: make(map[string]*task.Context),
}
return trigger
}
func NewMultipleProcessBasedTrigger(conf *base.ContinuousConfig,
profilingTaskDimension func(p api.ProcessInterface) string,
mainProcessSelector func(ps []api.ProcessInterface) api.ProcessInterface,
taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause),
reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)) *BaseTrigger {
trigger := &BaseTrigger{
profilingTaskDimension: profilingTaskDimension,
mainProcessSelector: mainProcessSelector,
taskSetter: taskSetter,
reportSetter: reportSetter,
profilingCache: make(map[string]*task.Context),
}
return trigger
}
func (c *BaseTrigger) Init(conf *base.ContinuousConfig) error {
executeDuration, err := time.ParseDuration(conf.Trigger.ExecuteDuration)
if err != nil {
return fmt.Errorf("check trigger task execute duration error: %v", err)
}
silenceDuration, err := time.ParseDuration(conf.Trigger.SilenceDuration)
if err != nil {
return fmt.Errorf("check trigger task silence duration error: %v", err)
}
c.executeTime = executeDuration
c.silenceTime = silenceDuration
return nil
}
func (c *BaseTrigger) ShouldTrigger(p api.ProcessInterface) bool {
return c.shouldTriggerFromDimension(c.profilingTaskDimension(p))
}
func (c *BaseTrigger) TriggerTasks(reporter base.TriggerReporter, causes []base.ThresholdCause) int {
executeCount := 0
// build needs profiling processes data cache
// key: dimension, value: map[process][]thresholds
dimensionedProcessThresholds := make(map[string]map[api.ProcessInterface][]base.ThresholdCause)
for _, cause := range causes {
causeProcess := cause.Process()
dimension := c.profilingTaskDimension(causeProcess)
if !c.shouldTriggerFromDimension(dimension) {
continue
}
processThresholds := dimensionedProcessThresholds[dimension]
if processThresholds == nil {
processThresholds = make(map[api.ProcessInterface][]base.ThresholdCause)
dimensionedProcessThresholds[dimension] = processThresholds
}
processThresholds[causeProcess] = append(processThresholds[causeProcess], cause)
}
// reports task through cache
for dimension, processWithThresholds := range dimensionedProcessThresholds {
processes := make([]api.ProcessInterface, 0)
var mainProcess api.ProcessInterface
for process := range processWithThresholds {
processes = append(processes, process)
}
if len(processes) == 1 {
mainProcess = processes[0]
} else {
mainProcess = c.mainProcessSelector(processes)
}
thresholdCauses := processWithThresholds[mainProcess]
taskContext, err := reporter.ReportProcesses(mainProcess, processes, thresholdCauses,
func(task *taskBase.ProfilingTask) {
task.MaxRunningDuration = c.executeTime
c.taskSetter(task, processes, thresholdCauses)
}, func(report *v3.ContinuousProfilingReport) {
report.Duration = int32(c.executeTime.Seconds())
c.reportSetter(report, processes, thresholdCauses)
})
if err != nil {
log.Warnf("failure to report the cause, process id: %s, error: %v", mainProcess.ID(), err)
continue
}
c.profilingCache[dimension] = taskContext
executeCount++
}
return executeCount
}
func (c *BaseTrigger) shouldTriggerFromDimension(dimension string) bool {
t := c.profilingCache[dimension]
if t == nil {
return true
} else if t.IsRunning() {
return false
}
return !t.RunningTime().IsZero() && time.Since(t.RunningTime()) > c.silenceTime
}