in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [163:193]
void finishFile(boolean delete) throws IOException, DataException {
if (isDirty()) {
recordWriter.commit();
// Since we are using GZIP compression, finish the file. Close is invoked only when this flush finishes
// and then the file is finished in ingest
// This is called when there is a time or a size limit reached. The file is then reset/rolled and then a
// new file is created for processing
outputStream.finish();
// It could be we were waiting on the lock when task suddenly stops and we should not ingest anymore
if (stopped) {
return;
}
try {
onRollCallback.accept(currentFile);
} catch (ConnectException e) {
/*
* Swallow the exception and continue to process subsequent records when behavior.on.error is not set to fail mode. Also, throwing/logging the
* exception with just a message to avoid polluting logs with duplicate trace.
*/
handleErrors("Failed to write records to KustoDB.", e);
}
if (delete) {
dumpFile();
}
} else {
// The stream is closed only when there are non-empty files for ingestion. Note that this closes the
// FileOutputStream as well
outputStream.close();
currentFile = null;
}
}