func()

in plugins/forwarder/grpc/nativetracing/forwarder.go [84:129]


func (f *Forwarder) Forward(batch event.BatchEvents) error {
	var tracingStream agent.TraceSegmentReportService_CollectClient
	var spanStream agent.SpanAttachedEventReportService_CollectClient

	defer func() {
		if err := closeStream(tracingStream, spanStream); err != nil {
			log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
		}
	}()
	var err error
	var stream streaming
	var streamData *server_grpc.OriginalData
	for _, e := range batch {
		switch data := e.GetData().(type) {
		case *v1.SniffData_Segment:
			if tracingStream == nil {
				tracingStream, err = f.tracingClient.Collect(context.Background())
				if err != nil {
					log.Logger.Errorf("open grpc stream error %v", err)
					return err
				}
			}
			stream = tracingStream
			streamData = server_grpc.NewOriginalData(data.Segment)
		case *v1.SniffData_SpanAttachedEvent:
			if spanStream == nil {
				spanStream, err = f.attachedEventClient.Collect(context.Background())
				if err != nil {
					log.Logger.Errorf("open grpc stream error %v", err)
					return err
				}
			}
			stream = spanStream
			streamData = server_grpc.NewOriginalData(data.SpanAttachedEvent)
		default:
			continue
		}

		err = stream.SendMsg(streamData)
		if err != nil {
			log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
			return err
		}
	}
	return closeStream(tracingStream, spanStream)
}