in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java [214:302]
private void open() throws IOException, InterruptedException {
if ((filePath == null) || (writer == null)) {
throw new IOException("Invalid file settings");
}
final Configuration config = new Configuration();
// disable FileSystem JVM shutdown hook
config.setBoolean("fs.automatic.close", false);
// Hadoop is not thread safe when doing certain RPC operations,
// including getFileSystem(), when running under Kerberos.
// open() must be called by one thread at a time in the JVM.
// NOTE: tried synchronizing on the underlying Kerberos principal previously
// which caused deadlocks. See FLUME-1231.
synchronized (staticLock) {
checkAndThrowInterruptedException();
try {
long counter = fileExtensionCounter.incrementAndGet();
String fullFileName = fileName + "." + counter;
if (fileSuffix != null && fileSuffix.length() > 0) {
fullFileName += fileSuffix;
} else if (codeC != null) {
fullFileName += codeC.getDefaultExtension();
}
bucketPath = filePath + "/" + inUsePrefix
+ fullFileName + inUseSuffix;
targetPath = filePath + "/" + fullFileName;
LOG.info("Creating " + bucketPath);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
if (codeC == null) {
// Need to get reference to FS using above config before underlying
// writer does in order to avoid shutdown hook &
// IllegalStateExceptions
if (!mockFsInjected) {
fileSystem = new Path(bucketPath).getFileSystem(config);
}
writer.open(bucketPath);
} else {
// need to get reference to FS before writer does to
// avoid shutdown hook
if (!mockFsInjected) {
fileSystem = new Path(bucketPath).getFileSystem(config);
}
writer.open(bucketPath, codeC, compType);
}
return null;
}
});
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw Throwables.propagate(ex);
}
}
}
isClosedMethod = getRefIsClosed();
sinkCounter.incrementConnectionCreatedCount();
resetCounters();
// if time-based rolling is enabled, schedule the roll
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch (Throwable t) {
LOG.error("Unexpected error", t);
}
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
isOpen = true;
}