in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java [139:183]
public void batch(final List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);
LOG.debug("got {} inputs ",actions.size());
List<Future<RecordMetadata>> sends = new ArrayList<>();
actions.stream()
.filter((row)->row instanceof Mutation)
.map((row)->(Mutation)row)
.flatMap((row)->{
Mutation mut = (Mutation) row;
boolean isDelete = mut instanceof Delete;
return mut.getFamilyCellMap().keySet().stream()
.flatMap((family)->mut.getFamilyCellMap().get(family).stream())
.map((cell)->{
CheckMutation ret = new CheckMutation();
ret.family=CellUtil.cloneFamily(cell);
ret.qualifier=CellUtil.cloneQualifier(cell);
ret.cell=cell;
return ret;
})
.filter((check)->keep(check))
.map((check)->addTopics(check))
.filter((check)->!CollectionUtils.isEmpty(check.topics))
.flatMap((check)->processMutation(check,isDelete).stream());
})
.map((event)->toByteArray(bout,event,encoderUse))
.forEach((item)->sends.add(producer.send(item)));
// make sure the sends are done before returning
sends.stream().forEach((sendResult)->{
try {
sendResult.get();
} catch (Exception e){
LOG.error("Exception caught when getting result",e);
throw new RuntimeException(e);
}
});
this.producer.flush();
}