public static void main()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [131:263]


  public static void main(String[] args) throws Exception {

    Map<String,String> otherProps = new HashMap<>();

    Options options = new Options();

    options.addRequiredOption("b", "kafkabrokers", true,
      "Kafka Brokers (comma delimited)");
    options.addOption("r", "routerulesfile", true,
      "file that has routing rules (defaults to conf/kafka-route-rules.xml");
    options.addOption("f", "kafkaproperties", true,
      "Path to properties file that has the kafka connection properties");
    options.addRequiredOption("p", "peername", true,
        "Name of hbase peer");
    options.addOption("z", "znode", true,
        "root zode to use in zookeeper (defaults to /kafkaproxy)");
    options.addOption("a", "autopeer", false,
        "Create a peer automatically to the hbase cluster");
    options.addOption("e", "enablepeer", false,
        "enable peer on startup (defaults to false)");

    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
    VersionInfo.logVersion();

    Configuration conf = HBaseConfiguration.create();
    CommandLine commandLine = null;

    Configuration commandLineConf = new Configuration();
    commandLineConf.clear();

    GenericOptionsParser parser = new GenericOptionsParser(commandLineConf, args);
    String[] restArgs = parser.getRemainingArgs();

    try {
      commandLine = new DefaultParser().parse(options, restArgs);
    } catch (ParseException e) {
      LOG.error("Could not parse: ", e);
      printUsageAndExit(options, -1);
    }


    String peer="";
    if (!commandLine.hasOption('p')){
      System.err.println("hbase peer id is required");
      System.exit(-1);
    } else {
      peer = commandLine.getOptionValue('p');
    }

    boolean createPeer = false;
    boolean enablePeer = false;

    if (commandLine.hasOption('a')){
      createPeer=true;
    }

    if (commandLine.hasOption("a")){
      enablePeer=true;
    }

    String rulesFile = StringUtils.defaultIfBlank(
            commandLine.getOptionValue("r"),"kafka-route-rules.xml");

    if (!new File(rulesFile).exists()){
      if (KafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){
        rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
      } else {
        System.err.println("Rules file " + rulesFile +
            " is invalid");
        System.exit(-1);
      }
    }

    otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile);

    if (commandLine.hasOption('f')){
      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f'));
    } else if (commandLine.hasOption('b')){
      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b'));
    } else {
      System.err.println("Kafka connection properites or brokers must be specified");
      System.exit(-1);
    }

    String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
        conf.get("hbase.zookeeper.property.clientPort");

    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);

    try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
    ) {
      zk.start();
      String rootZnode = "/kafkaproxy";
      setupZookeeperZnodes(zk,rootZnode,peer);
      checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer);
    }

    @SuppressWarnings("unchecked")
    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);

    List<String> allArgs = DEFAULT_PROPERTIES.keySet().stream()
        .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey)))
        .collect(Collectors.toList());

    allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream()
            .filter((argKey)->commandLineConf.get(argKey,"").equalsIgnoreCase(""))
            .map((argKey)->("-D"+argKey+"="+ CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
            .collect(Collectors.toList()));

    for (Map.Entry<String,String> k : commandLineConf){
      allArgs.add("-D"+k.getKey()+"="+k.getValue());
    }

    otherProps.keySet().stream()
        .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey)))
        .forEach((item)->allArgs.add(item));

    Arrays.stream(restArgs)
        .filter((arg)->(arg.startsWith("-D")||arg.equals("start")))
        .forEach((arg)->allArgs.add(arg));

    // is start there?
    if (allArgs.stream()
            .filter((arg)->arg.equalsIgnoreCase("start")).count() < 1){
      allArgs.add("start");
    }

    String[] newArgs=new String[allArgs.size()];
    allArgs.toArray(newArgs);

    new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
  }