in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java [131:169]
public void batch(final List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);
LOG.debug("got {} inputs ", actions.size());
List<Future<RecordMetadata>> sends = new ArrayList<>();
actions.stream().filter((row) -> row instanceof Mutation).map((row) -> (Mutation) row)
.flatMap((row) -> {
Mutation mut = (Mutation) row;
boolean isDelete = mut instanceof Delete;
return mut.getFamilyCellMap().keySet().stream()
.flatMap((family) -> mut.getFamilyCellMap().get(family).stream()).map((cell) -> {
CheckMutation ret = new CheckMutation();
ret.family = CellUtil.cloneFamily(cell);
ret.qualifier = CellUtil.cloneQualifier(cell);
ret.cell = cell;
return ret;
}).filter((check) -> keep(check)).map((check) -> addTopics(check))
.filter((check) -> !CollectionUtils.isEmpty(check.topics))
.flatMap((check) -> processMutation(check, isDelete).stream());
}).map((event) -> toByteArray(bout, event, encoderUse))
.forEach((item) -> sends.add(producer.send(item)));
// make sure the sends are done before returning
sends.stream().forEach((sendResult) -> {
try {
sendResult.get();
} catch (Exception e) {
LOG.error("Exception caught when getting result", e);
throw new RuntimeException(e);
}
});
this.producer.flush();
}