public int load()

in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [198:243]


    public int load(String value, Boolean enable2PC) throws StreamLoadException {

        String label = generateLoadLabel();

        LoadResponse loadResponse;
        int responseHttpStatus = -1;
        try (CloseableHttpClient httpClient = getHttpClient()) {
            String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
            LOG.debug("Stream load Request:{} ,Body:{}", loadUrlStr, value);
            // only to record the BE node in case of an exception
            this.loadUrlStr = loadUrlStr;

            HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
            httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8));
            HttpResponse httpResponse = httpClient.execute(httpPut);
            responseHttpStatus = httpResponse.getStatusLine().getStatusCode();
            String respMsg = httpResponse.getStatusLine().getReasonPhrase();
            String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
            loadResponse = new LoadResponse(responseHttpStatus, respMsg, response);
        } catch (IOException e) {
            e.printStackTrace();
            String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark stream load with label: " + label;
            LOG.warn(err, e);
            loadResponse = new LoadResponse(responseHttpStatus, e.getMessage(), err);
        }

        if (loadResponse.status != HttpStatus.SC_OK) {
            LOG.info("Stream load Response HTTP Status Error:{}", loadResponse);
            // throw new StreamLoadException("stream load error: " + loadResponse.respContent);
            throw new StreamLoadException("stream load error");
        } else {
            ObjectMapper obj = new ObjectMapper();
            try {
                RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                    LOG.error("Stream load Response RES STATUS Error:{}", loadResponse);
                    throw new StreamLoadException("stream load error");
                }
                LOG.info("Stream load Response:{}", loadResponse);
                return respContent.getTxnId();
            } catch (IOException e) {
                throw new StreamLoadException(e);
            }
        }

    }