public void openFile()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [95:156]


    public void openFile(@Nullable Long offset) throws IOException {
        SourceFile fileProps = new SourceFile();
        File folder = new File(basePath);
        if (!folder.exists() && !folder.mkdirs()) {
            if (!folder.exists()) {
                throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
            }
            log.warn("Couldn't create the directory because it already exists (likely a race condition)");
        }
        String filePath = getFilePath.apply(offset);
        fileProps.path = filePath;
        // Sanitize the file name just be sure and make sure it has the R/W permissions only

        String sanitizedFilePath = FilenameUtils.normalize(filePath);
        if (sanitizedFilePath == null) {
            /*
             * This condition should not occur at all. The files are created in controlled manner with the names consisting DB name, table name. This does not
             * permit names like "../../" or "./" etc. Still adding an additional check.
             */
            String errorMessage = String.format("Exception creating local file for write." +
                    "File %s has a non canonical path", filePath);
            throw new RuntimeException(errorMessage);
        }
        File file = new File(sanitizedFilePath);
        boolean createFile = file.createNewFile(); // if there is a runtime exception. It gets thrown from here
        if (createFile) {
            /*
             * Setting restricted permissions on the file. If these permissions cannot be set, then warn - We cannot fail the ingestion (Failing the ingestion
             * would for not having the permission would mean that there may be data loss or unexpected scenarios.) Added this in a conditional as these
             * permissions can be applied only when the file is created
             *
             */
            try {
                boolean execResult = file.setReadable(true, true);
                execResult = execResult && file.setWritable(true, true);
                execResult = execResult && file.setExecutable(false, false);
                if (!execResult) {
                    log.warn("Setting permissions creating file {} returned false." +
                            "The files set for ingestion can be read by other applications having access." +
                            "Please check security policies on the host that is preventing file permissions from being applied",
                            filePath);
                }
            } catch (Exception ex) {
                // There is a likely chance of the permissions not getting set. This is set to warn
                log.warn("Exception permissions creating file {} returned false." +
                        "The files set for ingestion can be read by other applications having access." +
                        "Please check security policies on the host that is preventing file permissions being applied",
                        filePath, ex);

            }
        }
        // The underlying file is closed only when the current countingStream (abstraction for size based writes) and
        // the file is rolled over
        FileOutputStream fos = new FileOutputStream(file);
        currentFileDescriptor = fos.getFD();
        fos.getChannel().truncate(0);
        fileProps.file = file;
        currentFile = fileProps;
        countingStream = new CountingOutputStream(new GZIPOutputStream(fos));
        outputStream = countingStream.getOutputStream();
        recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, countingStream);
    }