func()

in plugins/receiver/grpc/envoymetricsv3/als_service.go [39:92]


func (m *MetricsService) StreamMetrics(stream v3.MetricsService_StreamMetricsServer) error {
	messages := make(chan *v3.StreamMetricsMessage, m.limiterConfig.LimitCount*2)
	limiter := buffer.NewLimiter(m.limiterConfig, func() int {
		return len(messages)
	})

	var identity *v3.StreamMetricsMessage_Identifier

	defer limiter.Stop()
	limiter.Start(context.Background(), func() {
		count := len(messages)
		if count == 0 {
			return
		}
		metricsMessages := make([]*v3.StreamMetricsMessage, 0)
		for i := 0; i < count; i++ {
			metricsMessages = append(metricsMessages, <-messages)
		}
		metricsMessages[0].Identifier = identity

		d := &v1.SniffData{
			Name:      eventName,
			Timestamp: time.Now().UnixNano() / 1e6,
			Meta:      nil,
			Type:      v1.SniffType_EnvoyMetricsV3Type,
			Remote:    true,
			Data: &v1.SniffData_EnvoyMetricsV3List{
				EnvoyMetricsV3List: &v1.EnvoyMetricsV3List{
					Messages: metricsMessages,
				},
			},
		}
		m.receiveChannel <- d
	})

	var err1 error
	for {
		item, err := stream.Recv()
		if err != nil {
			err1 = err
			break
		}
		if item.Identifier != nil {
			identity = item.Identifier
		}
		messages <- item
		limiter.Check()
	}

	if err1 != io.EOF {
		return err1
	}
	return stream.SendAndClose(&v3.StreamMetricsResponse{})
}