public DorisStreamLoad()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [104:157]


    public DorisStreamLoad(
            String hostPort,
            DorisOptions dorisOptions,
            DorisExecutionOptions executionOptions,
            LabelGenerator labelGenerator,
            CloseableHttpClient httpClient) {
        this.hostPort = hostPort;
        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
        this.db = tableInfo[0];
        this.table = tableInfo[1];
        this.user = dorisOptions.getUsername();
        this.passwd = dorisOptions.getPassword();
        this.labelGenerator = labelGenerator;
        this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
        this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
        this.enable2PC = executionOptions.enabled2PC();
        this.streamLoadProp = executionOptions.getStreamLoadProp();
        this.enableDelete = executionOptions.getDeletable();
        this.httpClient = httpClient;
        String threadName =
                String.format(
                        "stream-load-upload-%s-%s",
                        labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier());
        this.executorService =
                new ThreadPoolExecutor(
                        1,
                        1,
                        0L,
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(),
                        new ExecutorThreadFactory(threadName));
        this.recordStream =
                new RecordStream(
                        executionOptions.getBufferSize(),
                        executionOptions.getBufferCount(),
                        executionOptions.isUseCache());
        if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
            lineDelimiter = null;
        } else {
            lineDelimiter =
                    EscapeHandler.escapeString(
                                    streamLoadProp.getProperty(
                                            LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
                            .getBytes();
        }
        this.enableGroupCommit =
                streamLoadProp.containsKey(GROUP_COMMIT)
                        && !streamLoadProp
                                .getProperty(GROUP_COMMIT)
                                .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
        this.enableGzCompress =
                streamLoadProp.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
        loadBatchFirstRecord = true;
    }