in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [262:279]
void flushByTimeImpl() {
// Flush time interval gets the write lock so that it won't starve
try (AutoCloseableLock ignored = new AutoCloseableLock(reentrantReadWriteLock.writeLock())) {
if (stopped) {
return;
}
// Lock before the check so that if a writing process just flushed this won't ingest empty files
if (isDirty()) {
finishFile(true);
}
resetFlushTimer(false);
} catch (Exception e) {
String fileName = currentFile == null ? "[no file created yet]" : currentFile.file.getName();
long currentSize = currentFile == null ? 0 : currentFile.rawBytes;
flushError = String.format("Error in flushByTime. Current file: %s, size: %d. ", fileName, currentSize);
log.error(flushError, e);
}
}