public DorisBatchStreamLoad()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [92:120]


    public DorisBatchStreamLoad(DorisOptions dorisOptions,
                                DorisReadOptions dorisReadOptions,
                                DorisExecutionOptions executionOptions,
                                LabelGenerator labelGenerator) {
        this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
        this.hostPort = backendUtil.getAvailableBackend();
        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
        this.db = tableInfo[0];
        this.table = tableInfo[1];
        this.username = dorisOptions.getUsername();
        this.password = dorisOptions.getPassword();
        this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
        this.loadProps = executionOptions.getStreamLoadProp();
        this.labelGenerator = labelGenerator;
        this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
        this.executionOptions = executionOptions;
        //init queue
        this.writeQueue = new ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
        LOG.info("init RecordBuffer capacity {}, count {}", executionOptions.getBufferFlushMaxBytes(), executionOptions.getFlushQueueSize());
        for (int index = 0; index < executionOptions.getFlushQueueSize(); index++) {
            this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
        }
        readQueue = new LinkedBlockingDeque<>();

        this.loadAsyncExecutor= new LoadAsyncExecutor();
        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(loadAsyncExecutor);
    }