func()

in plugins/receiver/grpc/envoyalsv2/als_service.go [65:147]


func (m *AlsService) StreamAccessLogs(stream v2.AccessLogService_StreamAccessLogsServer) error {
	messages := make(chan []byte, m.limiterConfig.LimitCount*2)
	limiter := buffer.NewLimiter(m.limiterConfig, func() int {
		return len(messages)
	})
	peer := grpc.GetPeerHostFromStreamContext(stream.Context())
	m.activeStreamingCount.Inc(alsVersion, peer)
	defer m.activeStreamingCount.Dec(alsVersion, peer)

	var identity *v2.StreamAccessLogsMessage_Identifier

	defer limiter.Stop()
	limiter.Start(context.Background(), func() {
		if identity == nil {
			return
		}
		count := len(messages)
		if count == 0 {
			return
		}
		logsMessages := make([][]byte, 0)
		for i := 0; i < count; i++ {
			logsMessages = append(logsMessages, <-messages)
		}

		// process first message identity
		firstMessage := logsMessages[0]
		firstAls := new(v2.StreamAccessLogsMessage)
		if err := proto.Unmarshal(firstMessage, firstAls); err != nil {
			log.Logger.Warnf("could not unmarshal als message: %v", err)
			return
		}
		firstAls.Identifier = identity
		marshal, _ := proto.Marshal(firstAls)
		logsMessages[0] = marshal

		d := &v1.SniffData{
			Name:      eventName,
			Timestamp: time.Now().UnixNano() / 1e6,
			Meta:      nil,
			Type:      v1.SniffType_EnvoyALSV2Type,
			Remote:    true,
			Data: &v1.SniffData_EnvoyALSV2List{
				EnvoyALSV2List: &v1.EnvoyALSV2List{
					Messages: logsMessages,
				},
			},
		}
		m.streamingToEventCount.Inc(alsVersion, peer)
		m.receiveChannel <- d
	})

	var err1 error
	for {
		data := grpc.NewOriginalData(nil)
		err := stream.RecvMsg(data)
		if err != nil {
			err1 = err
			m.streamingFailedCount.Inc(alsVersion, peer)
			break
		}
		if identity == nil {
			item := new(v2.StreamAccessLogsMessage)
			err = proto.Unmarshal(data.Content, item)
			if err != nil {
				return fmt.Errorf("could not umarshal first message, %v", err)
			}
			if item.Identifier == nil {
				return fmt.Errorf("could not found identity in message")
			}
			identity = item.Identifier
		}

		m.streamingCount.Inc(alsVersion, peer)
		messages <- data.Content
		limiter.Check()
	}

	if err1 != io.EOF {
		return err1
	}
	return stream.SendAndClose(&v2.StreamAccessLogsResponse{})
}