in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java [476:500]
public synchronized void flush() throws IOException, InterruptedException {
checkAndThrowInterruptedException();
if (!isBatchComplete()) {
doFlush();
if (idleTimeout > 0) {
// if the future exists and couldn't be cancelled, that would mean it has already run
// or been cancelled
if (idleFuture == null || idleFuture.cancel(false)) {
Callable<Void> idleAction = new Callable<Void>() {
public Void call() throws Exception {
LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
System.currentTimeMillis());
if (isOpen) {
close(true);
}
return null;
}
};
idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
TimeUnit.SECONDS);
}
}
}
}