in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [95:156]
public void openFile(@Nullable Long offset) throws IOException {
SourceFile fileProps = new SourceFile();
File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
if (!folder.exists()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
log.warn("Couldn't create the directory because it already exists (likely a race condition)");
}
String filePath = getFilePath.apply(offset);
fileProps.path = filePath;
// Sanitize the file name just be sure and make sure it has the R/W permissions only
String sanitizedFilePath = FilenameUtils.normalize(filePath);
if (sanitizedFilePath == null) {
/*
* This condition should not occur at all. The files are created in controlled manner with the names consisting DB name, table name. This does not
* permit names like "../../" or "./" etc. Still adding an additional check.
*/
String errorMessage = String.format("Exception creating local file for write." +
"File %s has a non canonical path", filePath);
throw new RuntimeException(errorMessage);
}
File file = new File(sanitizedFilePath);
boolean createFile = file.createNewFile(); // if there is a runtime exception. It gets thrown from here
if (createFile) {
/*
* Setting restricted permissions on the file. If these permissions cannot be set, then warn - We cannot fail the ingestion (Failing the ingestion
* would for not having the permission would mean that there may be data loss or unexpected scenarios.) Added this in a conditional as these
* permissions can be applied only when the file is created
*
*/
try {
boolean execResult = file.setReadable(true, true);
execResult = execResult && file.setWritable(true, true);
execResult = execResult && file.setExecutable(false, false);
if (!execResult) {
log.warn("Setting permissions creating file {} returned false." +
"The files set for ingestion can be read by other applications having access." +
"Please check security policies on the host that is preventing file permissions from being applied",
filePath);
}
} catch (Exception ex) {
// There is a likely chance of the permissions not getting set. This is set to warn
log.warn("Exception permissions creating file {} returned false." +
"The files set for ingestion can be read by other applications having access." +
"Please check security policies on the host that is preventing file permissions being applied",
filePath, ex);
}
}
// The underlying file is closed only when the current countingStream (abstraction for size based writes) and
// the file is rolled over
FileOutputStream fos = new FileOutputStream(file);
currentFileDescriptor = fos.getFD();
fos.getChannel().truncate(0);
fileProps.file = file;
currentFile = fileProps;
countingStream = new CountingOutputStream(new GZIPOutputStream(fos));
outputStream = countingStream.getOutputStream();
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, countingStream);
}