func()

in benchmark/producer.go [130:187]


func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
	p, err := rocketmq.NewProducer(
		producer.WithNameServer([]string{bp.nameSrv}),
		producer.WithRetry(2),
	)

	if err != nil {
		rlog.Error("New Producer Error", map[string]interface{}{
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return
	}

	err = p.Start()

	defer p.Shutdown()

	topic, tag := bp.topic, "benchmark-producer"
	msgStr := buildMsg(bp.bodySize)

AGAIN:
	select {
	case <-exit:
		return
	default:
	}

	now := time.Now()
	r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))

	if err != nil {
		rlog.Error("Send Message Error", map[string]interface{}{
			rlog.LogKeyUnderlayError: err.Error(),
		})
		goto AGAIN
	}

	if r.Status == primitive.SendOK {
		atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
		atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
		currentRT := int64(time.Since(now) / time.Millisecond)
		atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
		prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
		for currentRT > prevRT {
			if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
				break
			}
			prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
		}
		goto AGAIN
	}
	rlog.Error("Send Message Error", map[string]interface{}{
		"topic":                  topic,
		"tag":                    tag,
		rlog.LogKeyUnderlayError: err.Error(),
	})
	goto AGAIN
}