in perf/perf-consumer.go [73:131]
func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(consumeArgs, "", " ")
log.Info("Consumer config: ", string(b))
client, err := NewClient()
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
Type: consumeArgs.SubscriptionType,
SubscriptionMode: consumeArgs.SubscriptionMode,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// keep message stats
msgReceived := int64(0)
bytesReceived := int64(0)
// Print stats of the consume rate
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for {
select {
case cm, ok := <-consumer.Chan():
if !ok {
return
}
msgReceived++
bytesReceived += int64(len(cm.Message.Payload()))
consumer.Ack(cm.Message)
case <-tick.C:
currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
msgRate := float64(currentMsgReceived) / float64(10)
bytesRate := float64(currentBytesReceived) / float64(10)
log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
msgRate, bytesRate*8/1024/1024)
case <-stop:
return
}
}
}