public DorisBatchStreamLoad()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [118:176]


    public DorisBatchStreamLoad(
            DorisOptions dorisOptions,
            DorisReadOptions dorisReadOptions,
            DorisExecutionOptions executionOptions,
            LabelGenerator labelGenerator,
            int subTaskId) {
        this.backendUtil =
                StringUtils.isNotEmpty(dorisOptions.getBenodes())
                        ? new BackendUtil(dorisOptions.getBenodes())
                        : new BackendUtil(
                                RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
        this.hostPort = backendUtil.getAvailableBackend();
        this.username = dorisOptions.getUsername();
        this.password = dorisOptions.getPassword();
        this.loadProps = executionOptions.getStreamLoadProp();
        this.labelGenerator = labelGenerator;
        if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
            this.lineDelimiter = null;
        } else {
            this.lineDelimiter =
                    EscapeHandler.escapeString(
                                    loadProps.getProperty(
                                            LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
                            .getBytes();
        }
        this.enableGroupCommit =
                loadProps.containsKey(GROUP_COMMIT)
                        && !loadProps
                                .getProperty(GROUP_COMMIT)
                                .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
        this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
        this.executionOptions = executionOptions;
        this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
        // maxBlockedBytes ensures that a buffer can be written even if the queue is full
        this.maxBlockedBytes =
                (long) executionOptions.getBufferFlushMaxBytes()
                        * (executionOptions.getFlushQueueSize() + 1);
        if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
            String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
            Preconditions.checkState(
                    tableInfo.length == 2,
                    "tableIdentifier input error, the format is database.table");
            this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
        }
        this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize());
        this.loadExecutorService =
                new ThreadPoolExecutor(
                        1,
                        1,
                        0L,
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(1),
                        new DefaultThreadFactory("streamload-executor"),
                        new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(loadAsyncExecutor);
        this.subTaskId = subTaskId;
        this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
    }