in pkg/profiling/task/manager.go [341:387]
func (m *Manager) FlushProfilingData() error {
// cleanup the stopped after flush profiling data to make sure all the profiling data been sent
defer m.checkStoppedTaskAndRemoved()
if len(m.tasks) == 0 {
return nil
}
stream, err := m.profilingClient.CollectProfilingData(m.ctx)
if err != nil {
return err
}
currentMilli := time.Now().UnixMilli()
totalSendCount := make(map[string]int)
for _, t := range m.tasks {
data, err1 := t.runner.FlushData()
if err1 != nil {
log.Warnf("reading profiling task data failure. taskId: %s, error: %v", t.task.TaskID, err1)
continue
}
if len(data) == 0 {
continue
}
totalSendCount[t.TaskID()] += len(data)
// only the first data have task metadata
data[0].Task = &profiling_v3.EBPFProfilingTaskMetadata{
TaskId: t.task.TaskID,
ProcessId: t.task.ProcessIDList[0], // the profiling(draw flame-graph) task usually have the one process only
ProfilingStartTime: t.startRunningTime.UnixMilli(),
CurrentTime: currentMilli,
}
for _, d := range data {
// send each data, stop flush data if the stream have found error
if err1 := stream.Send(d); err1 != nil {
return err1
}
}
}
if len(totalSendCount) > 0 {
log.Infof("send profiling data summary: %v", totalSendCount)
}
_, err = stream.CloseAndRecv()
return err
}