in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [338:368]
private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
HiveEndPoint endPoint)
throws HiveWriter.ConnectException, InterruptedException {
try {
HiveWriter writer = allWriters.get( endPoint );
if (writer == null) {
LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint);
writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);
sinkCounter.incrementConnectionCreatedCount();
if (allWriters.size() > maxOpenConnections) {
int retired = closeIdleWriters();
if (retired == 0) {
closeEldestWriter();
}
}
allWriters.put(endPoint, writer);
activeWriters.put(endPoint, writer);
} else {
if (activeWriters.get(endPoint) == null) {
activeWriters.put(endPoint,writer);
}
}
return writer;
} catch (HiveWriter.ConnectException e) {
sinkCounter.incrementConnectionFailedCount();
throw e;
}
}