in plugins/forwarder/grpc/nativetracing/forwarder.go [84:129]
func (f *Forwarder) Forward(batch event.BatchEvents) error {
var tracingStream agent.TraceSegmentReportService_CollectClient
var spanStream agent.SpanAttachedEventReportService_CollectClient
defer func() {
if err := closeStream(tracingStream, spanStream); err != nil {
log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
}
}()
var err error
var stream streaming
var streamData *server_grpc.OriginalData
for _, e := range batch {
switch data := e.GetData().(type) {
case *v1.SniffData_Segment:
if tracingStream == nil {
tracingStream, err = f.tracingClient.Collect(context.Background())
if err != nil {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
}
stream = tracingStream
streamData = server_grpc.NewOriginalData(data.Segment)
case *v1.SniffData_SpanAttachedEvent:
if spanStream == nil {
spanStream, err = f.attachedEventClient.Collect(context.Background())
if err != nil {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
}
stream = spanStream
streamData = server_grpc.NewOriginalData(data.SpanAttachedEvent)
default:
continue
}
err = stream.SendMsg(streamData)
if err != nil {
log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
return err
}
}
return closeStream(tracingStream, spanStream)
}