in src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java [129:148]
public void insert(final Collection<SinkRecord> records) {
// note that records can be empty
for (SinkRecord record : records) {
// skip records
if (shouldSkipRecord(record)) {
continue;
}
// check topic mutating SMTs
checkTopicMutating(record);
// Might happen a count of record based flushing,buffer
insert(record);
}
// check all sink writer to see if they need to be flushed
for (DorisWriter writer : writer.values()) {
// Time based flushing
if (writer.shouldFlush()) {
writer.flushBuffer();
}
}
}