public void batch()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java [131:169]


  public void batch(final List<? extends Row> actions, Object[] results)
    throws IOException, InterruptedException {

    ByteArrayOutputStream bout = new ByteArrayOutputStream();
    BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);

    LOG.debug("got {} inputs ", actions.size());

    List<Future<RecordMetadata>> sends = new ArrayList<>();

    actions.stream().filter((row) -> row instanceof Mutation).map((row) -> (Mutation) row)
      .flatMap((row) -> {
        Mutation mut = (Mutation) row;
        boolean isDelete = mut instanceof Delete;
        return mut.getFamilyCellMap().keySet().stream()
          .flatMap((family) -> mut.getFamilyCellMap().get(family).stream()).map((cell) -> {
            CheckMutation ret = new CheckMutation();
            ret.family = CellUtil.cloneFamily(cell);
            ret.qualifier = CellUtil.cloneQualifier(cell);
            ret.cell = cell;
            return ret;
          }).filter((check) -> keep(check)).map((check) -> addTopics(check))
          .filter((check) -> !CollectionUtils.isEmpty(check.topics))
          .flatMap((check) -> processMutation(check, isDelete).stream());
      }).map((event) -> toByteArray(bout, event, encoderUse))
      .forEach((item) -> sends.add(producer.send(item)));

    // make sure the sends are done before returning
    sends.stream().forEach((sendResult) -> {
      try {
        sendResult.get();
      } catch (Exception e) {
        LOG.error("Exception caught when getting result", e);
        throw new RuntimeException(e);
      }
    });

    this.producer.flush();
  }