in plugins/forwarder/grpc/envoyalsv2/forwarder.go [95:139]
func (f *Forwarder) Forward(batch event.BatchEvents) error {
f.eventReadySendCount.Add(float64(len(batch)))
for _, e := range batch {
data, _ := e.GetData().(*v1.SniffData_EnvoyALSV2List)
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV2List.Messages)))
}
// open stream
timeRecord := f.forwardConnectTime.Start()
stream, err := f.alsClient.StreamAccessLogs(context.Background())
timeRecord.Stop()
if err != nil {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
timeRecord = f.forwardSendTime.Start()
for _, e := range batch {
data := e.GetEnvoyALSV2List()
if data == nil {
continue
}
// send message
for _, message := range data.Messages {
err := stream.SendMsg(server_grpc.NewOriginalData(message))
if err != nil {
log.Logger.Errorf("%s send envoy ALS v2 data error: %v", f.Name(), err)
f.closeStream(stream)
return err
}
}
f.eventSendFinishedCount.Inc(peer)
}
timeRecord.Stop()
// close stream
timeRecord = f.forwardCloseTime.Start()
f.closeStream(stream)
timeRecord.Stop()
return nil
}