in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java [55:100]
public static void main(String[] args) {
LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****");
VersionInfo.logVersion();
Options options = new Options();
options.addRequiredOption("k", "kafkabrokers", true, "Kafka Brokers " + "(comma delimited)");
options.addRequiredOption("t", "kafkatopics", true,
"Kafka Topics " + "to subscribe to (comma delimited)");
CommandLine commandLine = null;
try {
commandLine = new DefaultParser().parse(options, args);
} catch (ParseException e) {
LOG.error("Could not parse: ", e);
printUsageAndExit(options, -1);
}
SpecificDatumReader<HBaseKafkaEvent> dreader =
new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
String topic = commandLine.getOptionValue('t');
Properties props = new Properties();
props.put("bootstrap.servers", commandLine.getOptionValue('k'));
props.put("group.id", "hbase kafka test tool");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList()));
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
Iterator<ConsumerRecord<byte[], byte[]>> it = records.iterator();
while (it.hasNext()) {
ConsumerRecord<byte[], byte[]> record = it.next();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
try {
HBaseKafkaEvent event = dreader.read(null, decoder);
LOG.debug("key :" + Bytes.toString(record.key()) + " value " + event);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}