func()

in plugins/forwarder/grpc/nativemeter/forwarder.go [113:141]


func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
	if len(data.MeterCollection.MeterData) == 0 {
		return nil
	}
	firstMeter := data.MeterCollection.MeterData[0]
	streamName := fmt.Sprintf("batch-stream-%s-%s", firstMeter.Service, firstMeter.ServiceInstance)
	stream := streamMap[streamName]
	if stream == nil {
		ctx := lb.WithLoadBalanceConfig(
			context.Background(),
			firstMeter.ServiceInstance,
			f.loadCachedPeer(firstMeter.ServiceInstance))

		curStream, err := f.meterClient.CollectBatch(ctx)
		if err != nil {
			log.Logger.Errorf("open grpc stream error %v", err)
			return err
		}
		streamMap[streamName] = curStream
		stream = curStream
		f.savePeerInstanceFromStream(curStream, firstMeter.ServiceInstance)
	}

	if err := stream.SendMsg(data.MeterCollection); err != nil {
		log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
		return err
	}
	return nil
}