in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java [429:469]
private synchronized void doClose(boolean immediate)
throws InterruptedException {
checkAndThrowInterruptedException();
try {
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
LOG.info("Closing {}", bucketPath);
if (isOpen) {
new CloseHandler().close(immediate);
isOpen = false;
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// NOTE: timed rolls go through this codepath as well as other roll types
if (timedRollFuture != null && !timedRollFuture.isDone()) {
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
if (bucketPath != null && fileSystem != null) {
// could block or throw IOException
try {
renameBucket(bucketPath, targetPath, fileSystem);
} catch (Exception e) {
LOG.warn("failed to rename() file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
final Callable<Void> scheduledRename = new ScheduledRenameCallable();
timedRollerPool.schedule(scheduledRename, retryInterval, TimeUnit.SECONDS);
}
}
}