in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [409:454]
public void run() {
LOG.info("LoadAsyncExecutor start");
loadThreadAlive = true;
List<BatchRecordBuffer> recordList = new ArrayList<>(flushQueueSize);
while (started.get()) {
recordList.clear();
try {
BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if (buffer == null) {
continue;
}
if (buffer.getLabelName() == null) {
// When the label is empty, it is the eof buffer for checkpoint flush.
continue;
}
recordList.add(buffer);
boolean merge = false;
if (!flushQueue.isEmpty()) {
flushQueue.drainTo(recordList, flushQueueSize - 1);
if (mergeBuffer(recordList, buffer)) {
load(buffer.getLabelName(), buffer);
merge = true;
}
}
if (!merge) {
for (BatchRecordBuffer bf : recordList) {
if (bf == null || bf.getLabelName() == null) {
// When the label is empty, it's eof buffer for checkpointFlush.
continue;
}
load(bf.getLabelName(), bf);
}
}
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);
// clear queue to avoid writer thread blocking
flushQueue.clear();
break;
}
}
LOG.info("LoadAsyncExecutor stop");
loadThreadAlive = false;
}