in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [281:304]
public void writeData(SinkRecord sinkRecord) throws IOException, DataException {
if (flushError != null) {
throw new ConnectException(flushError);
}
if (sinkRecord == null)
return;
if (recordWriterProvider == null) {
initializeRecordWriter(sinkRecord);
}
if (currentFile == null) {
openFile(sinkRecord.kafkaOffset());
resetFlushTimer(true);
}
recordWriter.write(sinkRecord);
if (this.isDlqEnabled) {
currentFile.records.add(sinkRecord);
}
currentFile.rawBytes = countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) {
rotate(sinkRecord.kafkaOffset());
resetFlushTimer(true);
}
}