in tools/kafka-producer-performance/main.go [365:424]
func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
config *sarama.Config, brokers []string, throughput int) {
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
printErrorAndExit(69, "Failed to create producer: %s", err)
}
defer func() {
// Print final metrics.
printMetrics(os.Stdout, config.MetricRegistry)
if err := producer.Close(); err != nil {
printErrorAndExit(69, "Failed to close producer: %s", err)
}
}()
messages := make([][]*sarama.ProducerMessage, routines)
for i := 0; i < routines; i++ {
if i == routines-1 {
messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
} else {
messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
}
}
var wg gosync.WaitGroup
if throughput > 0 {
for _, messages := range messages {
messages := messages
wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
for _, message := range messages {
for i := 0; i < throughput; i++ {
_, _, err = producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to send message: %s", err)
}
}
<-ticker.C
}
ticker.Stop()
wg.Done()
}()
}
} else {
for _, messages := range messages {
messages := messages
wg.Add(1)
go func() {
for _, message := range messages {
_, _, err = producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to send message: %s", err)
}
}
wg.Done()
}()
}
}
wg.Wait()
}