func consume()

in perf/perf-consumer.go [73:131]


func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
	b, _ := json.MarshalIndent(clientArgs, "", "  ")
	log.Info("Client config: ", string(b))
	b, _ = json.MarshalIndent(consumeArgs, "", "  ")
	log.Info("Consumer config: ", string(b))

	client, err := NewClient()

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:                             consumeArgs.Topic,
		SubscriptionName:                  consumeArgs.SubscriptionName,
		EnableBatchIndexAcknowledgment:    consumeArgs.EnableBatchIndexAck,
		EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
		Type:                              consumeArgs.SubscriptionType,
		SubscriptionMode:                  consumeArgs.SubscriptionMode,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	// keep message stats
	msgReceived := int64(0)
	bytesReceived := int64(0)

	// Print stats of the consume rate
	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()

	for {
		select {
		case cm, ok := <-consumer.Chan():
			if !ok {
				return
			}
			msgReceived++
			bytesReceived += int64(len(cm.Message.Payload()))
			consumer.Ack(cm.Message)
		case <-tick.C:
			currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
			currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
			msgRate := float64(currentMsgReceived) / float64(10)
			bytesRate := float64(currentBytesReceived) / float64(10)

			log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
				msgRate, bytesRate*8/1024/1024)
		case <-stop:
			return
		}
	}
}