in tools/kafka-console-producer/kafka-console-producer.go [36:142]
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
}
if *topic == "" {
printUsageErrorAndExit("no -topic specified")
}
if *verbose {
sarama.Logger = logger
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
if *tlsEnabled {
tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
if err != nil {
printErrorAndExit(69, "Failed to create TLS config: %s", err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
}
switch *partitioner {
case "":
if *partition >= 0 {
config.Producer.Partitioner = sarama.NewManualPartitioner
} else {
config.Producer.Partitioner = sarama.NewHashPartitioner
}
case "hash":
config.Producer.Partitioner = sarama.NewHashPartitioner
case "random":
config.Producer.Partitioner = sarama.NewRandomPartitioner
case "manual":
config.Producer.Partitioner = sarama.NewManualPartitioner
if *partition == -1 {
printUsageErrorAndExit("-partition is required when partitioning manually")
}
default:
printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
}
message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
if *key != "" {
message.Key = sarama.StringEncoder(*key)
}
if *value != "" {
message.Value = sarama.StringEncoder(*value)
} else if stdinAvailable() {
bytes, err := io.ReadAll(os.Stdin)
if err != nil {
printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
}
message.Value = sarama.ByteEncoder(bytes)
} else {
printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
}
if *headers != "" {
var hdrs []sarama.RecordHeader
arrHdrs := strings.Split(*headers, ",")
for _, h := range arrHdrs {
if header := strings.Split(h, ":"); len(header) != 2 {
printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo")
} else {
hdrs = append(hdrs, sarama.RecordHeader{
Key: []byte(header[0]),
Value: []byte(header[1]),
})
}
}
if len(hdrs) != 0 {
message.Headers = hdrs
}
}
producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
if err != nil {
printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Println("Failed to close Kafka producer cleanly:", err)
}
}()
partition, offset, err := producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to produce message: %s", err)
} else if !*silent {
fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
}
if *showMetrics {
metrics.WriteOnce(config.MetricRegistry, os.Stderr)
}
}