func produce()

in perf/perf-producer.go [80:169]


func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
	b, _ := json.MarshalIndent(clientArgs, "", "  ")
	log.Info("Client config: ", string(b))
	b, _ = json.MarshalIndent(produceArgs, "", "  ")
	log.Info("Producer config: ", string(b))

	client, err := NewClient()
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic:                   produceArgs.Topic,
		MaxPendingMessages:      produceArgs.ProducerQueueSize,
		BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
		BatchingMaxSize:         produceArgs.BatchingMaxSize * 1024,
		BatchingMaxMessages:     produceArgs.BatchingNumMessages,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer producer.Close()

	ctx := context.Background()

	payload := make([]byte, produceArgs.MessageSize)

	ch := make(chan float64)

	limit := rate.Every(time.Duration(float64(time.Second) / float64(produceArgs.Rate)))
	rateLimiter := rate.NewLimiter(limit, produceArgs.Rate)

	go func(stopCh <-chan struct{}) {
		for {
			select {
			case <-stopCh:
				return
			default:
			}

			start := time.Now()
			if produceArgs.Rate > 0 {
				_ = rateLimiter.Wait(context.TODO())
			}

			producer.SendAsync(ctx, &pulsar.ProducerMessage{
				Payload: payload,
			}, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, e error) {
				if e != nil {
					log.WithError(e).Fatal("Failed to publish")
				}

				latency := time.Since(start).Seconds()
				ch <- latency
			})
		}
	}(stop)

	// Print stats of the publish rate and latencies
	tick := time.NewTicker(10 * time.Second)
	defer tick.Stop()
	q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0)
	messagesPublished := 0

	for {
		select {
		case <-stop:
			return
		case <-tick.C:
			messageRate := float64(messagesPublished) / float64(10)
			log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - 
				Latency ms: 50%% %5.1f -95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
				messageRate,
				messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
				q.Query(0.5)*1000,
				q.Query(0.95)*1000,
				q.Query(0.99)*1000,
				q.Query(0.999)*1000,
				q.Query(1.0)*1000,
			)

			q.Reset()
			messagesPublished = 0
		case latency := <-ch:
			messagesPublished++
			q.Insert(latency)
		}
	}
}