func()

in plugins/forwarder/grpc/nativeasyncprofiler/forwarder.go [78:118]


func (f *Forwarder) SyncForward(e *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
	switch requestData := e.GetData().(type) {
	case *v1.SniffData_AsyncProfilerTaskCommandQuery:
		query := requestData.AsyncProfilerTaskCommandQuery
		commands, err := f.profilingClient.GetAsyncProfilerTaskCommands(context.Background(), query)
		if err != nil {
			return nil, nil, err
		}

		return &v1.SniffData{Data: &v1.SniffData_Commands{Commands: commands}}, nil, nil
	case *v1.SniffData_AsyncProfilerData:
		// metadata
		ctx := context.WithValue(context.Background(), client_grpc.CtxBidirectionalStreamKey, true)
		stream, err := f.profilingClient.Collect(ctx)
		if err != nil {
			log.Logger.Errorf("%s open collect stream error: %v", f.Name(), err)
			return nil, nil, err
		}
		metaData := server_grpc.NewOriginalData(requestData.AsyncProfilerData)
		err = stream.SendMsg(metaData)
		if err != nil {
			log.Logger.Errorf("%s send meta data error: %v", f.Name(), err)
			f.closeStream(stream)
			return nil, nil, err
		}
		asyncProfilerCollectionResponse, err := stream.Recv()
		if err != nil {
			log.Logger.Errorf("%s receive meta data error: %v", f.Name(), err)
			f.closeStream(stream)
			return nil, nil, err
		}

		return &v1.SniffData{
			Data: &v1.SniffData_AsyncProfilerCollectionResponse{
				AsyncProfilerCollectionResponse: asyncProfilerCollectionResponse,
			},
		}, stream, nil
	}

	return nil, nil, fmt.Errorf("unsupport data")
}