in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [184:235]
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
k ->
new BatchRecordBuffer(
database,
table,
this.lineDelimiter,
executionOptions.getBufferFlushIntervalMs()));
int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
getLock(bufferKey).readLock().unlock();
if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
checkFlushException();
LOG.info(
"Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}",
currentCacheBytes.get(),
maxBlockedBytes);
block.await(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
this.exception.set(e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
// queue has space, flush according to the bufferMaxRows/bufferMaxBytes
if (flushQueue.size() < executionOptions.getFlushQueueSize()
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
boolean flush = bufferFullFlush(bufferKey);
LOG.info("trigger flush by buffer full, flush: {}", flush);
} else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
// The buffer capacity exceeds the stream load limit, flush
boolean flush = bufferFullFlush(bufferKey);
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
}
}