in benchmark/consumer.go [134:178]
func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(bc.groupID),
consumer.WithNameServer([]string{bc.nameSrv}),
)
if err != nil {
panic("new push consumer error:" + err.Error())
}
selector := consumer.MessageSelector{}
err = c.Subscribe(bc.topic, selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
atomic.AddInt64(&stati.receiveMessageTotal, 1)
now := time.Now().UnixNano() / int64(time.Millisecond)
b2cRT := now - msg.BornTimestamp
atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
s2cRT := now - msg.StoreTimestamp
atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
for {
old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
break
}
}
for {
old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
break
}
}
}
return consumer.ConsumeSuccess, nil
})
rlog.Info("Test Start", nil)
c.Start()
select {
case <-exit:
c.Shutdown()
return
}
}