public Status process()

in flume-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java [121:197]


  public Status process() throws EventDeliveryException {
    int batchSize = getMaxBatchSize();
    long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
    Channel myChannel = getChannel();
    Transaction txn = myChannel.getTransaction();
    txn.begin();
    boolean isMorphlineTransactionCommitted = true;
    try {
      int numEventsTaken = 0;
      handler.beginTransaction();
      isMorphlineTransactionCommitted = false;

      // repeatedly take and process events from the Flume queue
      for (int i = 0; i < batchSize; i++) {
        Event event = myChannel.take();
        if (event == null) {
          break;
        }
        sinkCounter.incrementEventDrainAttemptCount();
        numEventsTaken++;
        if (LOGGER.isTraceEnabled() && LogPrivacyUtil.allowLogRawData()) {
          LOGGER.trace("Flume event arrived {}", event);
        }

        //StreamEvent streamEvent = createStreamEvent(event);
        handler.process(event);
        if (System.currentTimeMillis() >= batchEndTime) {
          break;
        }
      }

      // update metrics
      if (numEventsTaken == 0) {
        sinkCounter.incrementBatchEmptyCount();
      }
      if (numEventsTaken < batchSize) {
        sinkCounter.incrementBatchUnderflowCount();
      } else {
        sinkCounter.incrementBatchCompleteCount();
      }
      handler.commitTransaction();
      isMorphlineTransactionCommitted = true;
      txn.commit();
      sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
      return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
    } catch (Throwable t) {
      // Ooops - need to rollback and back off
      LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " +
          myChannel.getName() + ". Exception follows.", t);
      sinkCounter.incrementEventWriteOrChannelFail(t);
      try {
        if (!isMorphlineTransactionCommitted) {
          handler.rollbackTransaction();
        }
      } catch (Throwable t2) {
        LOGGER.error("Morphline Sink " + getName() +
            ": Unable to rollback morphline transaction. Exception follows.", t2);
      } finally {
        try {
          txn.rollback();
        } catch (Throwable t4) {
          LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " +
              "Exception follows.", t4);
        }
      }

      if (t instanceof Error) {
        throw (Error) t; // rethrow original exception
      } else if (t instanceof ChannelException) {
        return Status.BACKOFF;
      } else {
        throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff
      }
    } finally {
      txn.close();
    }
  }