public void batch()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java [139:183]


  public void batch(final List<? extends Row> actions, Object[] results)
      throws IOException, InterruptedException {

    ByteArrayOutputStream bout = new ByteArrayOutputStream();
    BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);

    LOG.debug("got {} inputs ",actions.size());

    List<Future<RecordMetadata>> sends = new ArrayList<>();

    actions.stream()
      .filter((row)->row instanceof Mutation)
      .map((row)->(Mutation)row)
      .flatMap((row)->{
        Mutation mut = (Mutation) row;
        boolean isDelete = mut instanceof Delete;
        return mut.getFamilyCellMap().keySet().stream()
          .flatMap((family)->mut.getFamilyCellMap().get(family).stream())
            .map((cell)->{
              CheckMutation ret = new CheckMutation();
              ret.family=CellUtil.cloneFamily(cell);
              ret.qualifier=CellUtil.cloneQualifier(cell);
              ret.cell=cell;
              return ret;
            })
          .filter((check)->keep(check))
          .map((check)->addTopics(check))
          .filter((check)->!CollectionUtils.isEmpty(check.topics))
          .flatMap((check)->processMutation(check,isDelete).stream());
      })
      .map((event)->toByteArray(bout,event,encoderUse))
      .forEach((item)->sends.add(producer.send(item)));

    // make sure the sends are done before returning
    sends.stream().forEach((sendResult)->{
      try {
        sendResult.get();
      } catch (Exception e){
        LOG.error("Exception caught when getting result",e);
        throw new RuntimeException(e);
      }
    });

    this.producer.flush();
  }