public Status process()

in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [326:387]


  public Status process() throws EventDeliveryException {
    Status status = Status.READY;
    Channel channel = getChannel();
    Transaction txn = channel.getTransaction();
    List<Row> actions = new LinkedList<>();
    List<Increment> incs = new LinkedList<>();
    try {
      txn.begin();

      if (serializer instanceof BatchAware) {
        ((BatchAware) serializer).onBatchStart();
      }

      long i = 0;
      for (; i < batchSize; i++) {
        Event event = channel.take();
        if (event == null) {
          if (i == 0) {
            status = Status.BACKOFF;
            sinkCounter.incrementBatchEmptyCount();
          } else {
            sinkCounter.incrementBatchUnderflowCount();
          }
          break;
        } else {
          serializer.initialize(event, columnFamily);
          actions.addAll(serializer.getActions());
          incs.addAll(serializer.getIncrements());
        }
      }
      if (i == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      }
      sinkCounter.addToEventDrainAttemptCount(i);

      putEventsAndCommit(actions, incs, txn);

    } catch (Throwable e) {
      try {
        txn.rollback();
      } catch (Exception e2) {
        logger.error("Exception in rollback. Rollback might not have been " +
            "successful.", e2);
      }
      logger.error("Failed to commit transaction." +
          "Transaction rolled back.", e);
      sinkCounter.incrementEventWriteOrChannelFail(e);
      if (e instanceof Error || e instanceof RuntimeException) {
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        Throwables.propagate(e);
      } else {
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        throw new EventDeliveryException("Failed to commit transaction." +
            "Transaction rolled back.", e);
      }
    } finally {
      txn.close();
    }
    return status;
  }