func runSyncProducer()

in tools/kafka-producer-performance/main.go [365:424]


func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
	config *sarama.Config, brokers []string, throughput int) {
	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		printErrorAndExit(69, "Failed to create producer: %s", err)
	}
	defer func() {
		// Print final metrics.
		printMetrics(os.Stdout, config.MetricRegistry)
		if err := producer.Close(); err != nil {
			printErrorAndExit(69, "Failed to close producer: %s", err)
		}
	}()

	messages := make([][]*sarama.ProducerMessage, routines)
	for i := 0; i < routines; i++ {
		if i == routines-1 {
			messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
		} else {
			messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
		}
	}

	var wg gosync.WaitGroup
	if throughput > 0 {
		for _, messages := range messages {
			messages := messages
			wg.Add(1)
			go func() {
				ticker := time.NewTicker(time.Second)
				for _, message := range messages {
					for i := 0; i < throughput; i++ {
						_, _, err = producer.SendMessage(message)
						if err != nil {
							printErrorAndExit(69, "Failed to send message: %s", err)
						}
					}
					<-ticker.C
				}
				ticker.Stop()
				wg.Done()
			}()
		}
	} else {
		for _, messages := range messages {
			messages := messages
			wg.Add(1)
			go func() {
				for _, message := range messages {
					_, _, err = producer.SendMessage(message)
					if err != nil {
						printErrorAndExit(69, "Failed to send message: %s", err)
					}
				}
				wg.Done()
			}()
		}
	}
	wg.Wait()
}