pkg/profiling/continuous/triggers.go (92 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package continuous import ( "context" "fmt" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/profiling/continuous/base" "github.com/apache/skywalking-rover/pkg/profiling/continuous/trigger" "github.com/apache/skywalking-rover/pkg/profiling/task" taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3" "github.com/hashicorp/go-multierror" ) var triggerRegistration map[base.TargetProfilingType]base.Trigger func init() { triggerRegistration = make(map[base.TargetProfilingType]base.Trigger) triggerRegistration[base.TargetProfilingTypeOnCPU] = trigger.NewOnCPUTrigger() triggerRegistration[base.TargetProfilingTypeOffCPU] = trigger.NewOffCPUTrigger() triggerRegistration[base.TargetProfilingTypeNetwork] = trigger.NewNetworkTrigger() } type Triggers struct { taskManager *task.Manager continuousClient v3.ContinuousProfilingServiceClient ctx context.Context } func NewTriggers(ctx context.Context, moduleMgr *module.Manager, taskManager *task.Manager, conf *base.ContinuousConfig) (*Triggers, error) { coreOperator := moduleMgr.FindModule(core.ModuleName).(core.Operator) continuousClient := v3.NewContinuousProfilingServiceClient(coreOperator.BackendOperator().GetConnection()) var err error for _, t := range triggerRegistration { if e := t.Init(moduleMgr, conf); e != nil { err = multierror.Append(err, e) } } if err != nil { return nil, err } return &Triggers{ taskManager: taskManager, continuousClient: continuousClient, ctx: ctx, }, nil } func (m *Triggers) handleCauses(causes []base.ThresholdCause) { // generate the profiling tasks from the triggerRegistration profilingTypeWithCauses := make(map[base.TargetProfilingType][]base.ThresholdCause) for _, cause := range causes { profilingType := cause.FromPolicy().Policy.TargetProfilingType profilingTypeWithCauses[profilingType] = append(profilingTypeWithCauses[profilingType], cause) } for profilingType, ps := range profilingTypeWithCauses { if taskCount := triggerRegistration[profilingType].TriggerTasks(m, ps); taskCount > 0 { log.Infof("total generate %d %s tasks", taskCount, profilingType) } } } func (m *Triggers) ReportProcesses(process api.ProcessInterface, profilingProcesses []api.ProcessInterface, cases []base.ThresholdCause, taskSetter func(task *taskBase.ProfilingTask), reportSetter func(report *v3.ContinuousProfilingReport)) (*task.Context, error) { transferCauses := make([]*v3.ContinuousProfilingCause, 0) for _, c := range cases { transferCauses = append(transferCauses, c.GenerateTransferCause()) } // generate context taskContext, err := m.taskManager.BuildContextFromContinuous(profilingProcesses, taskSetter, func() (string, error) { report := &v3.ContinuousProfilingReport{ Layer: process.Entity().Layer, ServiceName: process.Entity().ServiceName, InstanceName: process.Entity().InstanceName, ProcessName: process.Entity().ProcessName, Causes: transferCauses, } reportSetter(report) profilingTask, err := m.continuousClient.ReportProfilingTask(m.ctx, report) if err != nil { return "", err } command := profilingTask.Commands[0] if len(profilingTask.Commands) != 1 || command.GetCommand() != "ContinuousProfilingReportTask" { return "", fmt.Errorf("the profiling task result is not right, command count: %d", len(profilingTask.Commands)) } for _, kv := range command.GetArgs() { if kv.GetKey() == "TaskId" { return kv.GetValue(), nil } } return "", fmt.Errorf("could not found the task ID from repoter") }) if err != nil { return nil, err } // execute task from context m.taskManager.StartTask(taskContext) return taskContext, nil }