protected void insertRecord()

in src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java [117:143]


    protected void insertRecord(final SinkRecord record) {
        // discard the record if the record offset is smaller or equal to server side offset
        if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
                && record.kafkaOffset() > processedOffset.get()) {
            SinkRecord dorisRecord = record;
            RecordBuffer tmpBuff = null;

            putBuffer(dorisRecord);
            if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
                    || (dorisOptions.getRecordNum() != 0
                            && buffer.getNumOfRecords() >= dorisOptions.getRecordNum())) {
                tmpBuff = buffer;
                this.buffer = new RecordBuffer();
            }

            if (tmpBuff != null) {
                flush(tmpBuff);
            }
            processedOffset.set(dorisRecord.kafkaOffset());
        } else {
            LOG.warn(
                    "The record offset is smaller than processedOffset. recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
                    record.kafkaOffset(),
                    offsetPersistedInDoris.get(),
                    processedOffset.get());
        }
    }