in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [123:245]
public static void main(String[] args) throws Exception {
Map<String, String> otherProps = new HashMap<>();
Options options = new Options();
options.addRequiredOption("b", "kafkabrokers", true, "Kafka Brokers (comma delimited)");
options.addOption("r", "routerulesfile", true,
"file that has routing rules (defaults to conf/kafka-route-rules.xml");
options.addOption("f", "kafkaproperties", true,
"Path to properties file that has the kafka connection properties");
options.addRequiredOption("p", "peername", true, "Name of hbase peer");
options.addOption("z", "znode", true,
"root zode to use in zookeeper (defaults to /kafkaproxy)");
options.addOption("a", "autopeer", false, "Create a peer automatically to the hbase cluster");
options.addOption("e", "enablepeer", false, "enable peer on startup (defaults to false)");
LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
VersionInfo.logVersion();
Configuration conf = HBaseConfiguration.create();
CommandLine commandLine = null;
Configuration commandLineConf = new Configuration();
commandLineConf.clear();
GenericOptionsParser parser = new GenericOptionsParser(commandLineConf, args);
String[] restArgs = parser.getRemainingArgs();
try {
commandLine = new DefaultParser().parse(options, restArgs);
} catch (ParseException e) {
LOG.error("Could not parse: ", e);
printUsageAndExit(options, -1);
}
String peer = "";
if (!commandLine.hasOption('p')) {
System.err.println("hbase peer id is required");
System.exit(-1);
} else {
peer = commandLine.getOptionValue('p');
}
boolean createPeer = false;
boolean enablePeer = false;
if (commandLine.hasOption('a')) {
createPeer = true;
}
if (commandLine.hasOption('e')) {
enablePeer = true;
}
String rulesFile =
StringUtils.defaultIfBlank(commandLine.getOptionValue("r"), "kafka-route-rules.xml");
if (!new File(rulesFile).exists()) {
if (KafkaProxy.class.getClassLoader().getResource(rulesFile) != null) {
rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
} else {
System.err.println("Rules file " + rulesFile + " is invalid");
System.exit(-1);
}
}
otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE, rulesFile);
if (commandLine.hasOption('f')) {
otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES, commandLine.getOptionValue('f'));
} else if (commandLine.hasOption('b')) {
otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS, commandLine.getOptionValue('b'));
} else {
System.err.println("Kafka connection properites or brokers must be specified");
System.exit(-1);
}
String zookeeperQ =
conf.get("hbase.zookeeper.quorum") + ":" + conf.get("hbase.zookeeper.property.clientPort");
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);) {
zk.start();
String rootZnode = "/kafkaproxy";
setupZookeeperZnodes(zk, rootZnode, peer);
checkForOrCreateReplicationPeer(conf, zk, rootZnode, peer, createPeer, enablePeer);
}
@SuppressWarnings("unchecked")
Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
List<String> allArgs = DEFAULT_PROPERTIES.keySet().stream()
.map((argKey) -> ("-D" + argKey + "=" + DEFAULT_PROPERTIES.get(argKey)))
.collect(Collectors.toList());
allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream()
.filter((argKey) -> commandLineConf.get(argKey, "").equalsIgnoreCase(""))
.map((argKey) -> ("-D" + argKey + "=" + CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
.collect(Collectors.toList()));
for (Map.Entry<String, String> k : commandLineConf) {
allArgs.add("-D" + k.getKey() + "=" + k.getValue());
}
otherProps.keySet().stream().map((argKey) -> ("-D" + argKey + "=" + otherProps.get(argKey)))
.forEach((item) -> allArgs.add(item));
Arrays.stream(restArgs).filter((arg) -> (arg.startsWith("-D") || arg.equals("start")))
.forEach((arg) -> allArgs.add(arg));
// is start there?
if (allArgs.stream().filter((arg) -> arg.equalsIgnoreCase("start")).count() < 1) {
allArgs.add("start");
}
String[] newArgs = new String[allArgs.size()];
allArgs.toArray(newArgs);
new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
}