public void startLoad()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [346:448]


    public void startLoad(String label, boolean isResume) throws IOException {
        if (enableGroupCommit) {
            label = null;
        }
        loadBatchFirstRecord = !isResume;
        HttpPutBuilder putBuilder = new HttpPutBuilder();
        recordStream.startInput(isResume);
        if (enableGroupCommit) {
            LOG.info("table {} stream load started with group commit on host {}", table, hostPort);
        } else {
            LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
        }
        this.currentLabel = label;
        try {
            InputStreamEntity entity = new InputStreamEntity(recordStream);
            putBuilder
                    .setUrl(loadUrlStr)
                    .baseAuth(user, passwd)
                    .addCommonHeader()
                    .addHiddenColumns(enableDelete)
                    .setLabel(label)
                    .setEntity(entity)
                    .addProperties(streamLoadProp);
            if (enable2PC) {
                putBuilder.enable2PC();
            }

            if (enableGzCompress) {
                putBuilder.setEntity(new GzipCompressingEntity(entity));
            }

            String executeMessage;
            if (enableGroupCommit) {
                executeMessage = "table " + table + " start execute load with group commit";
            } else {
                executeMessage = "table " + table + " start execute load for label " + label;
            }
            Thread mainThread = Thread.currentThread();
            pendingLoadFuture =
                    executorService.submit(
                            () -> {
                                LOG.info(executeMessage);
                                try {
                                    CloseableHttpResponse execute =
                                            httpClient.execute(putBuilder.build());
                                    RespContent respContent = handlePreCommitResponse(execute);

                                    if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                                        if (enable2PC
                                                && LoadStatus.LABEL_ALREADY_EXIST.equals(
                                                        respContent.getStatus())
                                                && !JOB_EXIST_FINISHED.equals(
                                                        respContent.getExistingJobStatus())) {
                                            LOG.info(
                                                    "try to abort {} cause status {}, exist job status {} ",
                                                    respContent.getLabel(),
                                                    respContent.getStatus(),
                                                    respContent.getExistingJobStatus());
                                            abortLabelExistTransaction(respContent);
                                            throw new LabelAlreadyExistsException(
                                                    "Exist label abort finished, retry");
                                        } else {
                                            String errMsg =
                                                    String.format(
                                                            "table %s.%s stream load error: %s, see more in %s",
                                                            getDb(),
                                                            getTable(),
                                                            respContent.getMessage(),
                                                            respContent.getErrorURL());
                                            LOG.error("Failed to load, {}", errMsg);
                                            throw new DorisRuntimeException(errMsg);
                                        }
                                    }
                                    return respContent;
                                } catch (NoRouteToHostException nex) {
                                    LOG.error("Failed to connect, cause ", nex);
                                    httpException = nex;
                                    mainThread.interrupt();
                                    throw new DorisRuntimeException(
                                            "No Route to Host to "
                                                    + hostPort
                                                    + ", exception: "
                                                    + nex);
                                } catch (Exception e) {
                                    LOG.error("Failed to execute load, cause ", e);
                                    httpException = e;
                                    // When an HTTP error occurs, the main thread should be
                                    // interrupted to prevent blocking
                                    mainThread.interrupt();
                                    throw e;
                                }
                            });
        } catch (Exception e) {
            String err;
            if (enableGroupCommit) {
                err = "failed to stream load data with group commit";
            } else {
                err = "failed to stream load data with label: " + label;
            }
            LOG.warn(err, e);
            throw e;
        }
    }