in benchmark/producer.go [130:187]
func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{bp.nameSrv}),
producer.WithRetry(2),
)
if err != nil {
rlog.Error("New Producer Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
return
}
err = p.Start()
defer p.Shutdown()
topic, tag := bp.topic, "benchmark-producer"
msgStr := buildMsg(bp.bodySize)
AGAIN:
select {
case <-exit:
return
default:
}
now := time.Now()
r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))
if err != nil {
rlog.Error("Send Message Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
goto AGAIN
}
if r.Status == primitive.SendOK {
atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
currentRT := int64(time.Since(now) / time.Millisecond)
atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
for currentRT > prevRT {
if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
break
}
prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
}
goto AGAIN
}
rlog.Error("Send Message Error", map[string]interface{}{
"topic": topic,
"tag": tag,
rlog.LogKeyUnderlayError: err.Error(),
})
goto AGAIN
}