public static void main()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java [55:100]


  public static void main(String[] args) {
    LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****");
    VersionInfo.logVersion();

    Options options = new Options();
    options.addRequiredOption("k", "kafkabrokers", true, "Kafka Brokers " + "(comma delimited)");
    options.addRequiredOption("t", "kafkatopics", true,
      "Kafka Topics " + "to subscribe to (comma delimited)");
    CommandLine commandLine = null;

    try {
      commandLine = new DefaultParser().parse(options, args);
    } catch (ParseException e) {
      LOG.error("Could not parse: ", e);
      printUsageAndExit(options, -1);
    }

    SpecificDatumReader<HBaseKafkaEvent> dreader =
      new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);

    String topic = commandLine.getOptionValue('t');
    Properties props = new Properties();
    props.put("bootstrap.servers", commandLine.getOptionValue('k'));
    props.put("group.id", "hbase kafka test tool");
    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
      consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList()));

      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        Iterator<ConsumerRecord<byte[], byte[]>> it = records.iterator();
        while (it.hasNext()) {
          ConsumerRecord<byte[], byte[]> record = it.next();
          BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
          try {
            HBaseKafkaEvent event = dreader.read(null, decoder);
            LOG.debug("key :" + Bytes.toString(record.key()) + " value " + event);
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        }
      }
    }
  }