in perf/perf-producer.go [80:169]
func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(produceArgs, "", " ")
log.Info("Producer config: ", string(b))
client, err := NewClient()
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
BatchingMaxSize: produceArgs.BatchingMaxSize * 1024,
BatchingMaxMessages: produceArgs.BatchingNumMessages,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
payload := make([]byte, produceArgs.MessageSize)
ch := make(chan float64)
limit := rate.Every(time.Duration(float64(time.Second) / float64(produceArgs.Rate)))
rateLimiter := rate.NewLimiter(limit, produceArgs.Rate)
go func(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
default:
}
start := time.Now()
if produceArgs.Rate > 0 {
_ = rateLimiter.Wait(context.TODO())
}
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: payload,
}, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, e error) {
if e != nil {
log.WithError(e).Fatal("Failed to publish")
}
latency := time.Since(start).Seconds()
ch <- latency
})
}
}(stop)
// Print stats of the publish rate and latencies
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0)
messagesPublished := 0
for {
select {
case <-stop:
return
case <-tick.C:
messageRate := float64(messagesPublished) / float64(10)
log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps -
Latency ms: 50%% %5.1f -95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
messageRate,
messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
q.Query(0.5)*1000,
q.Query(0.95)*1000,
q.Query(0.99)*1000,
q.Query(0.999)*1000,
q.Query(1.0)*1000,
)
q.Reset()
messagesPublished = 0
case latency := <-ch:
messagesPublished++
q.Insert(latency)
}
}
}