in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [262:290]
private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
if (!waitUtilDone && bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
LOG.info("bufferMap is empty, no need to flush {}", bufferKey);
return false;
}
if (null == bufferKey) {
boolean flush = false;
for (String key : bufferMap.keySet()) {
BatchRecordBuffer buffer = bufferMap.get(key);
if (waitUtilDone || buffer.shouldFlush()) {
// Ensure that the interval satisfies intervalMS
flushBuffer(key);
flush = true;
}
}
if (!waitUtilDone && !flush) {
return false;
}
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
} else {
LOG.warn("buffer not found for key: {}, may be already flushed.", bufferKey);
}
if (waitUtilDone) {
waitAsyncLoadFinish();
}
return true;
}