public void writeRecord()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [184:235]


    public void writeRecord(String database, String table, byte[] record) {
        checkFlushException();
        String bufferKey = getTableIdentifier(database, table);

        getLock(bufferKey).readLock().lock();
        BatchRecordBuffer buffer =
                bufferMap.computeIfAbsent(
                        bufferKey,
                        k ->
                                new BatchRecordBuffer(
                                        database,
                                        table,
                                        this.lineDelimiter,
                                        executionOptions.getBufferFlushIntervalMs()));

        int bytes = buffer.insert(record);
        currentCacheBytes.addAndGet(bytes);
        getLock(bufferKey).readLock().unlock();

        if (currentCacheBytes.get() > maxBlockedBytes) {
            lock.lock();
            try {
                while (currentCacheBytes.get() >= maxBlockedBytes) {
                    checkFlushException();
                    LOG.info(
                            "Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}",
                            currentCacheBytes.get(),
                            maxBlockedBytes);
                    block.await(1, TimeUnit.SECONDS);
                }
            } catch (InterruptedException e) {
                this.exception.set(e);
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        // queue has space, flush according to the bufferMaxRows/bufferMaxBytes
        if (flushQueue.size() < executionOptions.getFlushQueueSize()
                && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
                        || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
            boolean flush = bufferFullFlush(bufferKey);
            LOG.info("trigger flush by buffer full, flush: {}", flush);

        } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
                || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
            // The buffer capacity exceeds the stream load limit, flush
            boolean flush = bufferFullFlush(bufferKey);
            LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
        }
    }