in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [279:330]
private int drainOneBatch(Channel channel)
throws HiveWriter.Failure, InterruptedException {
int txnEventCount = 0;
try {
Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
for (; txnEventCount < batchSize; ++txnEventCount) {
// 0) Read event from Channel
Event event = channel.take();
if (event == null) {
break;
}
//1) Create end point by substituting place holders
HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
partitionVals, event.getHeaders(), timeZone,
needRounding, roundUnit, roundValue, useLocalTime);
//2) Create or reuse Writer
HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);
//3) Write
LOG.debug("{} : Writing event to {}", getName(), endPoint);
writer.write(event);
} // for
//4) Update counters
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
sinkCounter.addToEventDrainAttemptCount(txnEventCount);
// 5) Flush all Writers
for (HiveWriter writer : activeWriters.values()) {
writer.flush(true);
}
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return txnEventCount;
} catch (HiveWriter.Failure e) {
// in case of error we close all TxnBatches to start clean next time
LOG.warn(getName() + " : " + e.getMessage(), e);
abortAllWriters();
closeAllWriters();
throw e;
}
}