in plugins/forwarder/grpc/nativemeter/forwarder.go [113:141]
func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
if len(data.MeterCollection.MeterData) == 0 {
return nil
}
firstMeter := data.MeterCollection.MeterData[0]
streamName := fmt.Sprintf("batch-stream-%s-%s", firstMeter.Service, firstMeter.ServiceInstance)
stream := streamMap[streamName]
if stream == nil {
ctx := lb.WithLoadBalanceConfig(
context.Background(),
firstMeter.ServiceInstance,
f.loadCachedPeer(firstMeter.ServiceInstance))
curStream, err := f.meterClient.CollectBatch(ctx)
if err != nil {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
streamMap[streamName] = curStream
stream = curStream
f.savePeerInstanceFromStream(curStream, firstMeter.ServiceInstance)
}
if err := stream.SendMsg(data.MeterCollection); err != nil {
log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
return err
}
return nil
}