in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [240:276]
public Status process() throws EventDeliveryException {
// writers used in this Txn
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
try {
// 1 Enable Heart Beats
if (timeToSendHeartBeat.compareAndSet(true, false)) {
enableHeartBeatOnAllWriters();
}
// 2 Drain Batch
int txnEventCount = drainOneBatch(channel);
transaction.commit();
success = true;
// 3 Update Counters
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
return Status.READY;
}
} catch (InterruptedException err) {
LOG.warn(getName() + ": Thread was interrupted.", err);
return Status.BACKOFF;
} catch (Exception e) {
sinkCounter.incrementEventWriteOrChannelFail(e);
throw new EventDeliveryException(e);
} finally {
if (!success) {
transaction.rollback();
}
transaction.close();
}
}