perf/perf-producer.go (126 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "context" "encoding/json" "time" "golang.org/x/time/rate" "github.com/bmizerany/perks/quantile" "github.com/spf13/cobra" log "github.com/sirupsen/logrus" "github.com/apache/pulsar-client-go/pulsar" ) // ProduceArgs define the parameters required by produce type ProduceArgs struct { Topic string Rate int BatchingTimeMillis int BatchingMaxSize uint BatchingNumMessages uint MessageSize int ProducerQueueSize int } func newProducerCommand() *cobra.Command { produceArgs := ProduceArgs{} cmd := &cobra.Command{ Use: "produce ", Short: "Produce on a topic and measure performance", Args: cobra.ExactArgs(1), Run: func(_ *cobra.Command, args []string) { stop := stopCh() if FlagProfile { RunProfiling(stop) } produceArgs.Topic = args[0] produce(&produceArgs, stop) }, } // add flags flags := cmd.Flags() flags.IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled") flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1, "Batching grouping time in millis") flags.UintVar(&produceArgs.BatchingMaxSize, "batching-max-size", 128, "Max size of a batch (in KB)") flags.UintVar(&produceArgs.BatchingNumMessages, "batching-num-messages", 1000, "Maximum number of messages permitted in a batch") flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size") flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size") return cmd } func produce(produceArgs *ProduceArgs, stop <-chan struct{}) { b, _ := json.MarshalIndent(clientArgs, "", " ") log.Info("Client config: ", string(b)) b, _ = json.MarshalIndent(produceArgs, "", " ") log.Info("Producer config: ", string(b)) client, err := NewClient() if err != nil { log.Fatal(err) } defer client.Close() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: produceArgs.Topic, MaxPendingMessages: produceArgs.ProducerQueueSize, BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis), BatchingMaxSize: produceArgs.BatchingMaxSize * 1024, BatchingMaxMessages: produceArgs.BatchingNumMessages, }) if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background() payload := make([]byte, produceArgs.MessageSize) ch := make(chan float64) limit := rate.Every(time.Duration(float64(time.Second) / float64(produceArgs.Rate))) rateLimiter := rate.NewLimiter(limit, produceArgs.Rate) go func(stopCh <-chan struct{}) { for { select { case <-stopCh: return default: } start := time.Now() if produceArgs.Rate > 0 { _ = rateLimiter.Wait(context.TODO()) } producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: payload, }, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, e error) { if e != nil { log.WithError(e).Fatal("Failed to publish") } latency := time.Since(start).Seconds() ch <- latency }) } }(stop) // Print stats of the publish rate and latencies tick := time.NewTicker(10 * time.Second) defer tick.Stop() q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0) messagesPublished := 0 for { select { case <-stop: return case <-tick.C: messageRate := float64(messagesPublished) / float64(10) log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f -95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`, messageRate, messageRate*float64(produceArgs.MessageSize)/1024/1024*8, q.Query(0.5)*1000, q.Query(0.95)*1000, q.Query(0.99)*1000, q.Query(0.999)*1000, q.Query(1.0)*1000, ) q.Reset() messagesPublished = 0 case latency := <-ch: messagesPublished++ q.Insert(latency) } } }