func main()

in tools/kafka-producer-performance/main.go [219:316]


func main() {
	flag.Parse()

	if *brokers == "" {
		printUsageErrorAndExit("-brokers is required")
	}
	if *topic == "" {
		printUsageErrorAndExit("-topic is required")
	}
	if *messageLoad <= 0 {
		printUsageErrorAndExit("-message-load must be greater than 0")
	}
	if *messageSize <= 0 {
		printUsageErrorAndExit("-message-size must be greater than 0")
	}
	if *routines < 1 || *routines > *messageLoad {
		printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
	}
	if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
		printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
	}
	if *verbose {
		sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
	}

	config := sarama.NewConfig()

	config.Net.MaxOpenRequests = *maxOpenRequests
	config.Producer.MaxMessageBytes = *maxMessageBytes
	config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
	config.Producer.Timeout = *timeout
	config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
	config.Producer.Compression = parseCompression(*compression)
	config.Producer.Flush.Frequency = *flushFrequency
	config.Producer.Flush.Bytes = *flushBytes
	config.Producer.Flush.Messages = *flushMessages
	config.Producer.Flush.MaxMessages = *flushMaxMessages
	config.Producer.Return.Successes = true
	config.ClientID = *clientID
	config.ChannelBufferSize = *channelBufferSize
	config.Version = parseVersion(*version)

	if *securityProtocol == "SSL" {
		tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
		if err != nil {
			printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
				*tlsClientCert, *tlsClientKey, err)
		}

		if *tlsRootCACerts != "" {
			rootCAsBytes, err := os.ReadFile(*tlsRootCACerts)
			if err != nil {
				printErrorAndExit(69, "failed to read root CA certificates: %v", err)
			}
			certPool := x509.NewCertPool()
			if !certPool.AppendCertsFromPEM(rootCAsBytes) {
				printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
			}
			// Use specific root CA set vs the host's set
			tlsConfig.RootCAs = certPool
		}

		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}

	if err := config.Validate(); err != nil {
		printErrorAndExit(69, "Invalid configuration: %s", err)
	}

	// Print out metrics periodically.
	done := make(chan struct{})
	ctx, cancel := context.WithCancel(context.Background())
	go func(ctx context.Context) {
		defer close(done)
		t := time.Tick(5 * time.Second)
		for {
			select {
			case <-t:
				printMetrics(os.Stdout, config.MetricRegistry)
			case <-ctx.Done():
				return
			}
		}
	}(ctx)

	brokers := strings.Split(*brokers, ",")
	if *sync {
		runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
			config, brokers, *throughput)
	} else {
		runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
			config, brokers, *throughput)
	}

	cancel()
	<-done
}