in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [162:189]
private static void createStaticTopicWithCommand(String topic, int totalQueueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
args = new String[]{
"-c", cluster,
"-t", topic,
"-qn", String.valueOf(totalQueueNum),
"-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
args = new String[]{
"-b", brokerStr,
"-t", topic,
"-qn", String.valueOf(totalQueueNum),
"-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
cmd.execute(commandLine, options, null);
}