private void putEventsAndCommit()

in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [389:437]


  private void putEventsAndCommit(final List<Row> actions,
                                  final List<Increment> incs, Transaction txn) throws Exception {

    privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {
      final List<Mutation> mutations = new ArrayList<>(actions.size());
      for (Row r : actions) {
        if (r instanceof Put) {
          ((Put) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
        }
        // Newer versions of HBase - Increment implements Row.
        if (r instanceof Increment) {
          ((Increment) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
        }
        if (r instanceof Mutation) {
          mutations.add((Mutation)r);
        } else {
          logger.warn("dropping row " + r + " since it is not an Increment or Put");
        }
      }
      table.mutate(mutations);
      table.flush();
      return null;
    });

    privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {

      List<Increment> processedIncrements;
      if (batchIncrements) {
        processedIncrements = coalesceIncrements(incs);
      } else {
        processedIncrements = incs;
      }

      // Only used for unit testing.
      if (debugIncrCallback != null) {
        debugIncrCallback.onAfterCoalesce(processedIncrements);
      }

      for (final Increment i : processedIncrements) {
        i.setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
        table.mutate(i);
      }
      table.flush();
      return null;
    });

    txn.commit();
    sinkCounter.addToEventDrainSuccessCount(actions.size());
  }