func()

in plugins/forwarder/grpc/envoyalsv3/forwarder.go [95:139]


func (f *Forwarder) Forward(batch event.BatchEvents) error {
	f.eventReadySendCount.Add(float64(len(batch)))
	for _, e := range batch {
		data, _ := e.GetData().(*v1.SniffData_EnvoyALSV3List)
		f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
	}

	// open stream
	timeRecord := f.forwardConnectTime.Start()
	stream, err := f.alsClient.StreamAccessLogs(context.Background())
	timeRecord.Stop()
	if err != nil {
		log.Logger.Errorf("open grpc stream error %v", err)
		return err
	}
	peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
	timeRecord = f.forwardSendTime.Start()

	for _, e := range batch {
		data := e.GetEnvoyALSV3List()
		if data == nil {
			continue
		}

		// send message
		for _, message := range data.Messages {
			err := stream.SendMsg(server_grpc.NewOriginalData(message))
			if err != nil {
				log.Logger.Errorf("%s send envoy ALS v3 data error: %v", f.Name(), err)
				f.closeStream(stream)
				return err
			}
		}

		f.eventSendFinishedCount.Inc(peer)
	}

	timeRecord.Stop()

	// close stream
	timeRecord = f.forwardCloseTime.Start()
	f.closeStream(stream)
	timeRecord.Stop()
	return nil
}