func()

in benchmark/consumer.go [134:178]


func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(bc.groupID),
		consumer.WithNameServer([]string{bc.nameSrv}),
	)
	if err != nil {
		panic("new push consumer error:" + err.Error())
	}

	selector := consumer.MessageSelector{}
	err = c.Subscribe(bc.topic, selector, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			atomic.AddInt64(&stati.receiveMessageTotal, 1)
			now := time.Now().UnixNano() / int64(time.Millisecond)
			b2cRT := now - msg.BornTimestamp
			atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
			s2cRT := now - msg.StoreTimestamp
			atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)

			for {
				old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
				if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
					break
				}
			}

			for {
				old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
				if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
					break
				}
			}
		}
		return consumer.ConsumeSuccess, nil
	})

	rlog.Info("Test Start", nil)
	c.Start()
	select {
	case <-exit:
		c.Shutdown()
		return
	}
}