in plugins/receiver/grpc/envoymetricsv3/als_service.go [39:92]
func (m *MetricsService) StreamMetrics(stream v3.MetricsService_StreamMetricsServer) error {
messages := make(chan *v3.StreamMetricsMessage, m.limiterConfig.LimitCount*2)
limiter := buffer.NewLimiter(m.limiterConfig, func() int {
return len(messages)
})
var identity *v3.StreamMetricsMessage_Identifier
defer limiter.Stop()
limiter.Start(context.Background(), func() {
count := len(messages)
if count == 0 {
return
}
metricsMessages := make([]*v3.StreamMetricsMessage, 0)
for i := 0; i < count; i++ {
metricsMessages = append(metricsMessages, <-messages)
}
metricsMessages[0].Identifier = identity
d := &v1.SniffData{
Name: eventName,
Timestamp: time.Now().UnixNano() / 1e6,
Meta: nil,
Type: v1.SniffType_EnvoyMetricsV3Type,
Remote: true,
Data: &v1.SniffData_EnvoyMetricsV3List{
EnvoyMetricsV3List: &v1.EnvoyMetricsV3List{
Messages: metricsMessages,
},
},
}
m.receiveChannel <- d
})
var err1 error
for {
item, err := stream.Recv()
if err != nil {
err1 = err
break
}
if item.Identifier != nil {
identity = item.Identifier
}
messages <- item
limiter.Check()
}
if err1 != io.EOF {
return err1
}
return stream.SendAndClose(&v3.StreamMetricsResponse{})
}