in tools/kafka-producer-performance/main.go [219:316]
func main() {
flag.Parse()
if *brokers == "" {
printUsageErrorAndExit("-brokers is required")
}
if *topic == "" {
printUsageErrorAndExit("-topic is required")
}
if *messageLoad <= 0 {
printUsageErrorAndExit("-message-load must be greater than 0")
}
if *messageSize <= 0 {
printUsageErrorAndExit("-message-size must be greater than 0")
}
if *routines < 1 || *routines > *messageLoad {
printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
}
if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
}
if *verbose {
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
config := sarama.NewConfig()
config.Net.MaxOpenRequests = *maxOpenRequests
config.Producer.MaxMessageBytes = *maxMessageBytes
config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
config.Producer.Timeout = *timeout
config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
config.Producer.Compression = parseCompression(*compression)
config.Producer.Flush.Frequency = *flushFrequency
config.Producer.Flush.Bytes = *flushBytes
config.Producer.Flush.Messages = *flushMessages
config.Producer.Flush.MaxMessages = *flushMaxMessages
config.Producer.Return.Successes = true
config.ClientID = *clientID
config.ChannelBufferSize = *channelBufferSize
config.Version = parseVersion(*version)
if *securityProtocol == "SSL" {
tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
if err != nil {
printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
*tlsClientCert, *tlsClientKey, err)
}
if *tlsRootCACerts != "" {
rootCAsBytes, err := os.ReadFile(*tlsRootCACerts)
if err != nil {
printErrorAndExit(69, "failed to read root CA certificates: %v", err)
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(rootCAsBytes) {
printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
}
// Use specific root CA set vs the host's set
tlsConfig.RootCAs = certPool
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
if err := config.Validate(); err != nil {
printErrorAndExit(69, "Invalid configuration: %s", err)
}
// Print out metrics periodically.
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
defer close(done)
t := time.Tick(5 * time.Second)
for {
select {
case <-t:
printMetrics(os.Stdout, config.MetricRegistry)
case <-ctx.Done():
return
}
}
}(ctx)
brokers := strings.Split(*brokers, ",")
if *sync {
runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
config, brokers, *throughput)
} else {
runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
config, brokers, *throughput)
}
cancel()
<-done
}