in src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java [179:197]
public void commit(int partition) {
// Doris commit
Queue<KafkaRespContent> respContents = dorisStreamLoad.getKafkaRespContents();
while (!respContents.isEmpty()) {
KafkaRespContent respContent = respContents.poll();
DorisCommittable dorisCommittable =
new DorisCommittable(
dorisStreamLoad.getHostPort(),
respContent.getDatabase(),
respContent.getTxnId(),
respContent.getLastOffset(),
respContent.getTopic(),
partition,
respContent.getTable());
committableList.add(dorisCommittable);
}
dorisStreamLoad.setKafkaRespContents(new LinkedList<>());
dorisCommitter.commit(committableList);
}