in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [350:377]
public boolean mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer buffer) {
boolean merge = false;
if (recordList.size() > 1) {
boolean sameTable =
recordList.stream()
.map(BatchRecordBuffer::getTableIdentifier)
.distinct()
.count()
== 1;
// Buffers can be merged only if they belong to the same table.
if (sameTable) {
for (BatchRecordBuffer recordBuffer : recordList) {
if (recordBuffer != null
&& recordBuffer.getLabelName() != null
&& !buffer.getLabelName().equals(recordBuffer.getLabelName())
&& !recordBuffer.getBuffer().isEmpty()) {
merge(buffer, recordBuffer);
merge = true;
}
}
LOG.info(
"merge {} buffer to one stream load, result bufferBytes {}",
recordList.size(),
buffer.getBufferSizeBytes());
}
}
return merge;
}