in tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java [137:269]
public ConsoleProducerOptions(String[] args) {
super(args);
topicOpt = parser.accepts("topic", "REQUIRED: The topic name to produce messages to.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
"If specified without value, then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
.ofType(String.class);
batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
"please note that this option will be replaced if max-partition-memory-bytes is also set")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(16 * 1024);
messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
"and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
"This is the option to control `retries` in producer configs.")
.withRequiredArg()
.ofType(Integer.class)
.defaultsTo(3);
retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
"Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
"This is the option to control `retry.backoff.ms` in producer configs.")
.withRequiredArg()
.ofType(Long.class)
.defaultsTo(100L);
sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
" a message will queue awaiting sufficient batch size. The value is given in ms. " +
"This is the option to control `linger.ms` in producer configs.")
.withRequiredArg()
.describedAs("timeout_ms")
.ofType(Long.class)
.defaultsTo(1000L);
requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
.withRequiredArg()
.describedAs("request required acks")
.ofType(String.class)
.defaultsTo("-1");
requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
.withRequiredArg()
.describedAs("request timeout ms")
.ofType(Integer.class)
.defaultsTo(1500);
metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
"The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
"This is the option to control `metadata.max.age.ms` in producer configs.")
.withRequiredArg()
.describedAs("metadata expiration interval")
.ofType(Long.class)
.defaultsTo(5 * 60 * 1000L);
maxBlockMsOpt = parser.accepts("max-block-ms",
"The max time that the producer will block for during a send request.")
.withRequiredArg()
.describedAs("max block on send")
.ofType(Long.class)
.defaultsTo(60 * 1000L);
maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
"The total memory used by the producer to buffer records waiting to be sent to the server. " +
"This is the option to control `buffer.memory` in producer configs.")
.withRequiredArg()
.describedAs("total memory in bytes")
.ofType(Long.class)
.defaultsTo(32 * 1024 * 1024L);
maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
"The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
"will attempt to optimistically group them together until this size is reached. " +
"This is the option to control `batch.size` in producer configs.")
.withRequiredArg()
.describedAs("memory in bytes per partition")
.ofType(Integer.class)
.defaultsTo(16 * 1024);
messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a separate message.")
.withRequiredArg()
.describedAs("reader_class")
.ofType(String.class)
.defaultsTo(LineMessageReader.class.getName());
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
"This is the option to control `send.buffer.bytes` in producer configs.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 100);
propertyOpt = parser.accepts("property",
"A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
"\nDefault properties include:" +
"\n parse.key=false" +
"\n parse.headers=false" +
"\n ignore.error=false" +
"\n key.separator=\\t" +
"\n headers.delimiter=\\t" +
"\n headers.separator=," +
"\n headers.key.separator=:" +
"\n null.marker= When set, any fields (key, value and headers) equal to this will be replaced by null" +
"\nDefault parsing pattern when:" +
"\n parse.headers=true and parse.key=true:" +
"\n \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
"\n parse.key=true:" +
"\n \"key\\tvalue\"" +
"\n parse.headers=true:" +
"\n \"h1:v1,h2:v2...\\tvalue\"")
.withRequiredArg()
.describedAs("prop")
.ofType(String.class);
readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
.withRequiredArg()
.describedAs("producer_prop")
.ofType(String.class);
producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
try {
options = parser.parse(args);
} catch (OptionException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
checkArgs();
}