in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [131:263]
public static void main(String[] args) throws Exception {
Map<String,String> otherProps = new HashMap<>();
Options options = new Options();
options.addRequiredOption("b", "kafkabrokers", true,
"Kafka Brokers (comma delimited)");
options.addOption("r", "routerulesfile", true,
"file that has routing rules (defaults to conf/kafka-route-rules.xml");
options.addOption("f", "kafkaproperties", true,
"Path to properties file that has the kafka connection properties");
options.addRequiredOption("p", "peername", true,
"Name of hbase peer");
options.addOption("z", "znode", true,
"root zode to use in zookeeper (defaults to /kafkaproxy)");
options.addOption("a", "autopeer", false,
"Create a peer automatically to the hbase cluster");
options.addOption("e", "enablepeer", false,
"enable peer on startup (defaults to false)");
LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
VersionInfo.logVersion();
Configuration conf = HBaseConfiguration.create();
CommandLine commandLine = null;
Configuration commandLineConf = new Configuration();
commandLineConf.clear();
GenericOptionsParser parser = new GenericOptionsParser(commandLineConf, args);
String[] restArgs = parser.getRemainingArgs();
try {
commandLine = new DefaultParser().parse(options, restArgs);
} catch (ParseException e) {
LOG.error("Could not parse: ", e);
printUsageAndExit(options, -1);
}
String peer="";
if (!commandLine.hasOption('p')){
System.err.println("hbase peer id is required");
System.exit(-1);
} else {
peer = commandLine.getOptionValue('p');
}
boolean createPeer = false;
boolean enablePeer = false;
if (commandLine.hasOption('a')){
createPeer=true;
}
if (commandLine.hasOption("a")){
enablePeer=true;
}
String rulesFile = StringUtils.defaultIfBlank(
commandLine.getOptionValue("r"),"kafka-route-rules.xml");
if (!new File(rulesFile).exists()){
if (KafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){
rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
} else {
System.err.println("Rules file " + rulesFile +
" is invalid");
System.exit(-1);
}
}
otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile);
if (commandLine.hasOption('f')){
otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f'));
} else if (commandLine.hasOption('b')){
otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b'));
} else {
System.err.println("Kafka connection properites or brokers must be specified");
System.exit(-1);
}
String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
conf.get("hbase.zookeeper.property.clientPort");
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
) {
zk.start();
String rootZnode = "/kafkaproxy";
setupZookeeperZnodes(zk,rootZnode,peer);
checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer);
}
@SuppressWarnings("unchecked")
Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
List<String> allArgs = DEFAULT_PROPERTIES.keySet().stream()
.map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey)))
.collect(Collectors.toList());
allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream()
.filter((argKey)->commandLineConf.get(argKey,"").equalsIgnoreCase(""))
.map((argKey)->("-D"+argKey+"="+ CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
.collect(Collectors.toList()));
for (Map.Entry<String,String> k : commandLineConf){
allArgs.add("-D"+k.getKey()+"="+k.getValue());
}
otherProps.keySet().stream()
.map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey)))
.forEach((item)->allArgs.add(item));
Arrays.stream(restArgs)
.filter((arg)->(arg.startsWith("-D")||arg.equals("start")))
.forEach((arg)->allArgs.add(arg));
// is start there?
if (allArgs.stream()
.filter((arg)->arg.equalsIgnoreCase("start")).count() < 1){
allArgs.add("start");
}
String[] newArgs=new String[allArgs.size()];
allArgs.toArray(newArgs);
new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
}