in src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java [117:143]
protected void insertRecord(final SinkRecord record) {
// discard the record if the record offset is smaller or equal to server side offset
if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
&& record.kafkaOffset() > processedOffset.get()) {
SinkRecord dorisRecord = record;
RecordBuffer tmpBuff = null;
putBuffer(dorisRecord);
if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
|| (dorisOptions.getRecordNum() != 0
&& buffer.getNumOfRecords() >= dorisOptions.getRecordNum())) {
tmpBuff = buffer;
this.buffer = new RecordBuffer();
}
if (tmpBuff != null) {
flush(tmpBuff);
}
processedOffset.set(dorisRecord.kafkaOffset());
} else {
LOG.warn(
"The record offset is smaller than processedOffset. recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
record.kafkaOffset(),
offsetPersistedInDoris.get(),
processedOffset.get());
}
}