func()

in pkg/profiling/task/manager.go [341:387]


func (m *Manager) FlushProfilingData() error {
	// cleanup the stopped after flush profiling data to make sure all the profiling data been sent
	defer m.checkStoppedTaskAndRemoved()
	if len(m.tasks) == 0 {
		return nil
	}

	stream, err := m.profilingClient.CollectProfilingData(m.ctx)
	if err != nil {
		return err
	}
	currentMilli := time.Now().UnixMilli()
	totalSendCount := make(map[string]int)
	for _, t := range m.tasks {
		data, err1 := t.runner.FlushData()
		if err1 != nil {
			log.Warnf("reading profiling task data failure. taskId: %s, error: %v", t.task.TaskID, err1)
			continue
		}

		if len(data) == 0 {
			continue
		}

		totalSendCount[t.TaskID()] += len(data)
		// only the first data have task metadata
		data[0].Task = &profiling_v3.EBPFProfilingTaskMetadata{
			TaskId:             t.task.TaskID,
			ProcessId:          t.task.ProcessIDList[0], // the profiling(draw flame-graph) task usually have the one process only
			ProfilingStartTime: t.startRunningTime.UnixMilli(),
			CurrentTime:        currentMilli,
		}

		for _, d := range data {
			// send each data, stop flush data if the stream have found error
			if err1 := stream.Send(d); err1 != nil {
				return err1
			}
		}
	}

	if len(totalSendCount) > 0 {
		log.Infof("send profiling data summary: %v", totalSendCount)
	}
	_, err = stream.CloseAndRecv()
	return err
}