public synchronized Status process()

in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [238:314]


  public synchronized Status process() throws EventDeliveryException {
    if (session.hasPendingOperations()) {
      // If for whatever reason we have pending operations, refuse to process
      // more and tell the caller to try again a bit later. We don't want to
      // pile on the KuduSession.
      return Status.BACKOFF;
    }

    Channel channel = getChannel();
    Transaction txn = channel.getTransaction();

    txn.begin();

    try {
      long txnEventCount = 0;
      for (; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take();
        if (event == null) {
          break;
        }

        List<Operation> operations = operationsProducer.getOperations(event);
        for (Operation o : operations) {
          session.apply(o);
        }
      }

      logger.debug("Flushing {} events", txnEventCount);
      List<OperationResponse> responses = session.flush();
      if (responses != null) {
        for (OperationResponse response : responses) {
          // Throw an EventDeliveryException if at least one of the responses was
          // a row error. Row errors can occur for example when an event is inserted
          // into Kudu successfully but the Flume transaction is rolled back for some reason,
          // and a subsequent replay of the same Flume transaction leads to a
          // duplicate key error since the row already exists in Kudu.
          // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
          // is enabled in the config.
          if (response.hasRowError()) {
            throw new EventDeliveryException("Failed to flush one or more changes. " +
                "Transaction rolled back: " + response.getRowError().toString());
          }
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      txn.commit();

      if (txnEventCount == 0) {
        return Status.BACKOFF;
      }

      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
      return Status.READY;

    } catch (Throwable e) {
      txn.rollback();

      String msg = "Failed to commit transaction. Transaction rolled back.";
      logger.error(msg, e);
      if (e instanceof Error || e instanceof RuntimeException) {
        throw new RuntimeException(e);
      } else {
        logger.error(msg, e);
        throw new EventDeliveryException(msg, e);
      }
    } finally {
      txn.close();
    }
  }