public static void main()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [123:245]


  public static void main(String[] args) throws Exception {

    Map<String, String> otherProps = new HashMap<>();

    Options options = new Options();

    options.addRequiredOption("b", "kafkabrokers", true, "Kafka Brokers (comma delimited)");
    options.addOption("r", "routerulesfile", true,
      "file that has routing rules (defaults to conf/kafka-route-rules.xml");
    options.addOption("f", "kafkaproperties", true,
      "Path to properties file that has the kafka connection properties");
    options.addRequiredOption("p", "peername", true, "Name of hbase peer");
    options.addOption("z", "znode", true,
      "root zode to use in zookeeper (defaults to /kafkaproxy)");
    options.addOption("a", "autopeer", false, "Create a peer automatically to the hbase cluster");
    options.addOption("e", "enablepeer", false, "enable peer on startup (defaults to false)");

    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
    VersionInfo.logVersion();

    Configuration conf = HBaseConfiguration.create();
    CommandLine commandLine = null;

    Configuration commandLineConf = new Configuration();
    commandLineConf.clear();

    GenericOptionsParser parser = new GenericOptionsParser(commandLineConf, args);
    String[] restArgs = parser.getRemainingArgs();

    try {
      commandLine = new DefaultParser().parse(options, restArgs);
    } catch (ParseException e) {
      LOG.error("Could not parse: ", e);
      printUsageAndExit(options, -1);
    }

    String peer = "";
    if (!commandLine.hasOption('p')) {
      System.err.println("hbase peer id is required");
      System.exit(-1);
    } else {
      peer = commandLine.getOptionValue('p');
    }

    boolean createPeer = false;
    boolean enablePeer = false;

    if (commandLine.hasOption('a')) {
      createPeer = true;
    }

    if (commandLine.hasOption('e')) {
      enablePeer = true;
    }

    String rulesFile =
      StringUtils.defaultIfBlank(commandLine.getOptionValue("r"), "kafka-route-rules.xml");

    if (!new File(rulesFile).exists()) {
      if (KafkaProxy.class.getClassLoader().getResource(rulesFile) != null) {
        rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
      } else {
        System.err.println("Rules file " + rulesFile + " is invalid");
        System.exit(-1);
      }
    }

    otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE, rulesFile);

    if (commandLine.hasOption('f')) {
      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES, commandLine.getOptionValue('f'));
    } else if (commandLine.hasOption('b')) {
      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS, commandLine.getOptionValue('b'));
    } else {
      System.err.println("Kafka connection properites or brokers must be specified");
      System.exit(-1);
    }

    String zookeeperQ =
      conf.get("hbase.zookeeper.quorum") + ":" + conf.get("hbase.zookeeper.property.clientPort");

    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);

    try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);) {
      zk.start();
      String rootZnode = "/kafkaproxy";
      setupZookeeperZnodes(zk, rootZnode, peer);
      checkForOrCreateReplicationPeer(conf, zk, rootZnode, peer, createPeer, enablePeer);
    }

    @SuppressWarnings("unchecked")
    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);

    List<String> allArgs = DEFAULT_PROPERTIES.keySet().stream()
      .map((argKey) -> ("-D" + argKey + "=" + DEFAULT_PROPERTIES.get(argKey)))
      .collect(Collectors.toList());

    allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream()
      .filter((argKey) -> commandLineConf.get(argKey, "").equalsIgnoreCase(""))
      .map((argKey) -> ("-D" + argKey + "=" + CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
      .collect(Collectors.toList()));

    for (Map.Entry<String, String> k : commandLineConf) {
      allArgs.add("-D" + k.getKey() + "=" + k.getValue());
    }

    otherProps.keySet().stream().map((argKey) -> ("-D" + argKey + "=" + otherProps.get(argKey)))
      .forEach((item) -> allArgs.add(item));

    Arrays.stream(restArgs).filter((arg) -> (arg.startsWith("-D") || arg.equals("start")))
      .forEach((arg) -> allArgs.add(arg));

    // is start there?
    if (allArgs.stream().filter((arg) -> arg.equalsIgnoreCase("start")).count() < 1) {
      allArgs.add("start");
    }

    String[] newArgs = new String[allArgs.size()];
    allArgs.toArray(newArgs);

    new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
  }