in plugins/forwarder/grpc/nativeasyncprofiler/forwarder.go [78:118]
func (f *Forwarder) SyncForward(e *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
switch requestData := e.GetData().(type) {
case *v1.SniffData_AsyncProfilerTaskCommandQuery:
query := requestData.AsyncProfilerTaskCommandQuery
commands, err := f.profilingClient.GetAsyncProfilerTaskCommands(context.Background(), query)
if err != nil {
return nil, nil, err
}
return &v1.SniffData{Data: &v1.SniffData_Commands{Commands: commands}}, nil, nil
case *v1.SniffData_AsyncProfilerData:
// metadata
ctx := context.WithValue(context.Background(), client_grpc.CtxBidirectionalStreamKey, true)
stream, err := f.profilingClient.Collect(ctx)
if err != nil {
log.Logger.Errorf("%s open collect stream error: %v", f.Name(), err)
return nil, nil, err
}
metaData := server_grpc.NewOriginalData(requestData.AsyncProfilerData)
err = stream.SendMsg(metaData)
if err != nil {
log.Logger.Errorf("%s send meta data error: %v", f.Name(), err)
f.closeStream(stream)
return nil, nil, err
}
asyncProfilerCollectionResponse, err := stream.Recv()
if err != nil {
log.Logger.Errorf("%s receive meta data error: %v", f.Name(), err)
f.closeStream(stream)
return nil, nil, err
}
return &v1.SniffData{
Data: &v1.SniffData_AsyncProfilerCollectionResponse{
AsyncProfilerCollectionResponse: asyncProfilerCollectionResponse,
},
}, stream, nil
}
return nil, nil, fmt.Errorf("unsupport data")
}