in flume-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java [121:197]
public Status process() throws EventDeliveryException {
int batchSize = getMaxBatchSize();
long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
Channel myChannel = getChannel();
Transaction txn = myChannel.getTransaction();
txn.begin();
boolean isMorphlineTransactionCommitted = true;
try {
int numEventsTaken = 0;
handler.beginTransaction();
isMorphlineTransactionCommitted = false;
// repeatedly take and process events from the Flume queue
for (int i = 0; i < batchSize; i++) {
Event event = myChannel.take();
if (event == null) {
break;
}
sinkCounter.incrementEventDrainAttemptCount();
numEventsTaken++;
if (LOGGER.isTraceEnabled() && LogPrivacyUtil.allowLogRawData()) {
LOGGER.trace("Flume event arrived {}", event);
}
//StreamEvent streamEvent = createStreamEvent(event);
handler.process(event);
if (System.currentTimeMillis() >= batchEndTime) {
break;
}
}
// update metrics
if (numEventsTaken == 0) {
sinkCounter.incrementBatchEmptyCount();
}
if (numEventsTaken < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
handler.commitTransaction();
isMorphlineTransactionCommitted = true;
txn.commit();
sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
} catch (Throwable t) {
// Ooops - need to rollback and back off
LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " +
myChannel.getName() + ". Exception follows.", t);
sinkCounter.incrementEventWriteOrChannelFail(t);
try {
if (!isMorphlineTransactionCommitted) {
handler.rollbackTransaction();
}
} catch (Throwable t2) {
LOGGER.error("Morphline Sink " + getName() +
": Unable to rollback morphline transaction. Exception follows.", t2);
} finally {
try {
txn.rollback();
} catch (Throwable t4) {
LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " +
"Exception follows.", t4);
}
}
if (t instanceof Error) {
throw (Error) t; // rethrow original exception
} else if (t instanceof ChannelException) {
return Status.BACKOFF;
} else {
throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff
}
} finally {
txn.close();
}
}