func()

in plugins/receiver/grpc/nativeasyncprofiler/async_profiler_service.go [56:102]


func (p *AsyncProfilerService) Collect(clientStream asyncprofiler.AsyncProfilerTask_CollectServer) error {
	metaData := grpc.NewOriginalData(nil)
	err := clientStream.RecvMsg(metaData)
	if err == io.EOF {
		return nil
	}
	if err != nil {
		return err
	}
	event := &v1.SniffData{
		Data: &v1.SniffData_AsyncProfilerData{
			AsyncProfilerData: metaData.Content,
		},
	}
	// send metadata to server
	serverStreamAndResp, serverStream, err := p.SyncInvoker.SyncInvoke(event)
	if err != nil {
		return fmt.Errorf("satellite send metadata to server but get err: %s", err)
	}
	data := serverStreamAndResp.GetData().(*v1.SniffData_AsyncProfilerCollectionResponse)
	// send response to client
	err = clientStream.Send(data.AsyncProfilerCollectionResponse)
	if err == io.EOF {
		return nil
	}
	if err != nil {
		return err
	}

	// receive jfr content and send
	for {
		jfrContent := grpc.NewOriginalData(nil)
		err := clientStream.RecvMsg(jfrContent)
		if err == io.EOF {
			if err = serverStream.CloseSend(); err != nil {
				log.Logger.Errorf("async profiler service close server stream error: %s", err)
			}
			return nil
		}
		if err != nil {
			return err
		}
		if err = serverStream.SendMsg(jfrContent); err != nil {
			return fmt.Errorf("satellite send jfr content to server but get err: %s ", err)
		}
	}
}