in pkg/profiling/continuous/trigger/common.go [98:151]
func (c *BaseTrigger) TriggerTasks(reporter base.TriggerReporter, causes []base.ThresholdCause) int {
executeCount := 0
// build needs profiling processes data cache
// key: dimension, value: map[process][]thresholds
dimensionedProcessThresholds := make(map[string]map[api.ProcessInterface][]base.ThresholdCause)
for _, cause := range causes {
causeProcess := cause.Process()
dimension := c.profilingTaskDimension(causeProcess)
if !c.shouldTriggerFromDimension(dimension) {
continue
}
processThresholds := dimensionedProcessThresholds[dimension]
if processThresholds == nil {
processThresholds = make(map[api.ProcessInterface][]base.ThresholdCause)
dimensionedProcessThresholds[dimension] = processThresholds
}
processThresholds[causeProcess] = append(processThresholds[causeProcess], cause)
}
// reports task through cache
for dimension, processWithThresholds := range dimensionedProcessThresholds {
processes := make([]api.ProcessInterface, 0)
var mainProcess api.ProcessInterface
for process := range processWithThresholds {
processes = append(processes, process)
}
if len(processes) == 1 {
mainProcess = processes[0]
} else {
mainProcess = c.mainProcessSelector(processes)
}
thresholdCauses := processWithThresholds[mainProcess]
taskContext, err := reporter.ReportProcesses(mainProcess, processes, thresholdCauses,
func(task *taskBase.ProfilingTask) {
task.MaxRunningDuration = c.executeTime
c.taskSetter(task, processes, thresholdCauses)
}, func(report *v3.ContinuousProfilingReport) {
report.Duration = int32(c.executeTime.Seconds())
c.reportSetter(report, processes, thresholdCauses)
})
if err != nil {
log.Warnf("failure to report the cause, process id: %s, error: %v", mainProcess.ID(), err)
continue
}
c.profilingCache[dimension] = taskContext
executeCount++
}
return executeCount
}