pkg/profiling/task/manager.go (296 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package task import ( "context" "fmt" "sync" "time" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/profiling/task/base" common_v3 "skywalking.apache.org/repo/goapi/collect/common/v3" profiling_v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3" ) var log = logger.GetLogger("profiling", "task") type Manager struct { moduleMgr *module.Manager processOperator process.Operator profilingClient profiling_v3.EBPFProfilingServiceClient ctx context.Context cancel context.CancelFunc taskConfig *base.TaskConfig tasks map[string]*Context instanceID string lastUpdateTime int64 } func NewManager(ctx context.Context, moduleMgr *module.Manager, taskConfig *base.TaskConfig) (*Manager, error) { coreOperator := moduleMgr.FindModule(core.ModuleName).(core.Operator) connection := coreOperator.BackendOperator().GetConnection() profilingClient := profiling_v3.NewEBPFProfilingServiceClient(connection) processOperator := moduleMgr.FindModule(process.ModuleName).(process.Operator) if err := CheckProfilingTaskConfig(taskConfig, moduleMgr); err != nil { return nil, err } ctx, cancel := context.WithCancel(ctx) manager := &Manager{ moduleMgr: moduleMgr, processOperator: processOperator, profilingClient: profilingClient, taskConfig: taskConfig, tasks: make(map[string]*Context), instanceID: coreOperator.InstanceID(), ctx: ctx, cancel: cancel, } return manager, nil } func (m *Manager) Start() { } func (m *Manager) BuildContextFromCommand(command *common_v3.Command) (*Context, error) { // analyze command t, err := base.ProfilingTaskFromCommand(command) if err != nil || t == nil { return nil, fmt.Errorf("parsing profiling task failure, command: %v, reason: %v", command.GetArgs(), err) } // find processes processes := make([]api.ProcessInterface, 0) for _, processID := range t.ProcessIDList { taskProcess := m.processOperator.FindProcessByID(processID) if taskProcess == nil { return nil, fmt.Errorf("could not found %s processes %s", t.TaskID, t.ProcessIDList) } processes = append(processes, taskProcess) } taskContext := &Context{task: t, processes: processes, status: NotRunning, recalcDuration: make(chan bool, 1)} // check existing task, extended the running time existTask := m.tasks[taskContext.BuildTaskIdentity()] // if task are same, then just rewrite the task information and return if existTask != nil && existTask.IsSameTask(taskContext) { existTask.task = t return existTask, nil } // init runner if err := m.initTaskContext(taskContext); err != nil { return nil, err } return taskContext, nil } func (m *Manager) BuildContextFromContinuous(processes []api.ProcessInterface, taskSetter func(task *base.ProfilingTask), taskIDGenerator func() (string, error)) (*Context, error) { task := base.ProfilingTaskFromContinuous(processes, taskSetter) taskContext := &Context{task: task, processes: processes, status: NotRunning, recalcDuration: make(chan bool, 1)} // if the task already exist, then return error existTask := m.tasks[taskContext.BuildTaskIdentity()] // if task are same, then just rewrite the task information and return if existTask != nil && existTask.IsSameTask(taskContext) { return nil, fmt.Errorf("already exist profiling task, so ignore") } taskID, err := taskIDGenerator() if err != nil { return nil, err } task.TaskID = taskID // init runner if err := m.initTaskContext(taskContext); err != nil { return nil, err } return taskContext, nil } func (m *Manager) initTaskContext(taskContext *Context) error { // init runner var r base.ProfileTaskRunner task := taskContext.task if runner, err := NewProfilingRunner(task.TargetType, m.taskConfig, m.moduleMgr); err != nil { return err } else if err := runner.Init(task, taskContext.processes); err != nil { return fmt.Errorf("could not init %s runner for task: %s: %v", task.TriggerType, task.TaskID, err) } else { r = runner } taskContext.runner = r taskContext.ctx, taskContext.cancel = context.WithCancel(m.ctx) return nil } func (m *Manager) StartTask(c *Context) { // shutdown task if exists taskIdentity := c.BuildTaskIdentity() existTask := m.tasks[taskIdentity] if existTask != nil { // just extend the task time if the task are same if c.IsSameTask(existTask) { // notify to re-calculate the task duration(task stop timer) c.recalcDuration <- true return } // close task if not same id := m.tasks[taskIdentity].TaskID() log.Infof("existing profiling task: %s, so need to stop it", id) if err := m.ShutdownAndRemoveTask(m.tasks[taskIdentity]); err != nil { log.Warnf("shutdown existing profiling task failure, so cannot to start new profiling task: %v. reason: %v", c.task.TaskID, err) return } } currentMilli := time.Now().UnixNano() / int64(time.Millisecond) m.tasks[taskIdentity] = c // already reach time if currentMilli >= c.task.StartTime { m.runTask(c) return } // schedule to execute afterRun := time.Since(time.UnixMilli(c.task.StartTime)) go func() { select { case <-time.After(afterRun): log.Infof("the profiling task need to wait %fmin to run: %s", afterRun.Minutes(), c.TaskID()) m.runTask(c) case <-c.ctx.Done(): return } }() } func (m *Manager) runTask(c *Context) { log.Infof("ready to starting profiling task: %s", c.TaskID()) var wg sync.WaitGroup wg.Add(1) c.runningWg = &wg go func() { defer func() { wg.Done() c.status = Stopped }() notify := func() { c.status = Running c.startRunningTime = time.Now() m.afterProfilingStartSuccess(c) } // start running if err := c.runner.Run(m.ctx, notify); err != nil { log.Warnf("executing profiling task failure, taskId: %s, reason: %v", c.task.TaskID, err) } }() } func (m *Manager) afterProfilingStartSuccess(c *Context) { pidList := make([]int32, 0) for _, p := range c.processes { pidList = append(pidList, p.Pid()) } log.Infof("profiling task has been started. taskId: %s, pid: %d", c.task.TaskID, pidList) go func() { for { endTime := c.startRunningTime.Add(c.task.MaxRunningDuration) select { // shutdown task when arrived task running task case <-time.After(time.Until(endTime)): log.Infof("arrived task running time, shutting down task: %s", c.task.TaskID) if err := m.shutdownTask(c); err != nil { log.Warnf("shutting down task failure: %s, reason: %v", c.task.TaskID, err) } return case <-c.recalcDuration: // re-calculate the task end-time log.Infof("received the extend duration task, task id: %s", c.task.TaskID) continue // shutdown when context finished case <-c.ctx.Done(): if err := m.shutdownTask(c); err != nil { log.Warnf("shutting down task failure: %s, reason: %v", c.task.TaskID, err) } return } } }() } func (m *Manager) shutdownTask(c *Context) error { // return if not running if c.runningWg == nil { return nil } defer func() { if r := recover(); r != nil { log.Warnf("recover from shutdown task, id: %s, error: %v", c.TaskID(), r) } }() err := c.runner.Stop() c.runningWg.Wait() c.cancel() return err } func (m *Manager) ShutdownAndRemoveTask(c *Context) error { err := m.shutdownTask(c) delete(m.tasks, c.BuildTaskIdentity()) return err } func (m *Manager) Shutdown() error { m.cancel() return nil } func (m *Manager) checkStoppedTaskAndRemoved() { for identity, t := range m.tasks { if t.status == Stopped { delete(m.tasks, identity) } } } func (m *Manager) StartingWatchTask() error { // query task tasks, err := m.profilingClient.QueryTasks(m.ctx, &profiling_v3.EBPFProfilingTaskQuery{ RoverInstanceId: m.instanceID, LatestUpdateTime: m.lastUpdateTime, }) if err != nil { return err } if len(tasks.Commands) == 0 { return nil } // analyze profiling tasks taskContexts := make([]*Context, 0) lastUpdateTime := m.lastUpdateTime for _, cmd := range tasks.Commands { taskContext, err := m.BuildContextFromCommand(cmd) if err != nil { log.Warnf("could not execute task, ignored. %v", err) continue } if taskContext.UpdateTime() > lastUpdateTime { lastUpdateTime = taskContext.UpdateTime() } if !taskContext.CheckTaskRunnable() { continue } taskContexts = append(taskContexts, taskContext) } // update last task time m.lastUpdateTime = lastUpdateTime if len(taskContexts) == 0 { return nil } taskIDList := make([]string, len(taskContexts)) for inx, c := range taskContexts { taskIDList[inx] = c.TaskID() } log.Infof("received %d profiling task: %v", len(taskContexts), taskIDList) // start tasks for _, t := range taskContexts { m.StartTask(t) } return nil } func (m *Manager) FlushProfilingData() error { // cleanup the stopped after flush profiling data to make sure all the profiling data been sent defer m.checkStoppedTaskAndRemoved() if len(m.tasks) == 0 { return nil } stream, err := m.profilingClient.CollectProfilingData(m.ctx) if err != nil { return err } currentMilli := time.Now().UnixMilli() totalSendCount := make(map[string]int) for _, t := range m.tasks { data, err1 := t.runner.FlushData() if err1 != nil { log.Warnf("reading profiling task data failure. taskId: %s, error: %v", t.task.TaskID, err1) continue } if len(data) == 0 { continue } totalSendCount[t.TaskID()] += len(data) // only the first data have task metadata data[0].Task = &profiling_v3.EBPFProfilingTaskMetadata{ TaskId: t.task.TaskID, ProcessId: t.task.ProcessIDList[0], // the profiling(draw flame-graph) task usually have the one process only ProfilingStartTime: t.startRunningTime.UnixMilli(), CurrentTime: currentMilli, } for _, d := range data { // send each data, stop flush data if the stream have found error if err1 := stream.Send(d); err1 != nil { return err1 } } } if len(totalSendCount) > 0 { log.Infof("send profiling data summary: %v", totalSendCount) } _, err = stream.CloseAndRecv() return err }