in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java [488:529]
public void stop() {
// do not constrain close() calls with a timeout
synchronized (sfWritersLock) {
for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
LOG.info("Closing {}", entry.getKey());
try {
entry.getValue().close(false, true);
} catch (Exception ex) {
LOG.warn("Exception while closing " + entry.getKey() + ". " +
"Exception follows.", ex);
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
// shut down all our thread pools
ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool };
for (ExecutorService execService : toShutdown) {
execService.shutdown();
try {
while (execService.isTerminated() == false) {
execService.awaitTermination(
Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ex) {
LOG.warn("shutdown interrupted on " + execService, ex);
}
}
callTimeoutPool = null;
timedRollerPool = null;
synchronized (sfWritersLock) {
sfWriters.clear();
sfWriters = null;
}
sinkCounter.stop();
super.stop();
}