private Future buildReqAndExec()

in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java [383:434]


    private Future<StreamLoadResponse> buildReqAndExec(String host, Integer port, CloseableHttpClient client) {
        HttpPut httpPut = new HttpPut(URLs.streamLoad(host, port, database, table, isHttpsEnabled));
        try {
            handleStreamLoadProperties(httpPut);
        } catch (OptionRequiredException e) {
            throw new RuntimeException("stream load handle properties failed", e);
        }
        PipedInputStream pipedInputStream = new PipedInputStream(pipeSize);
        try {
            output = new PipedOutputStream(pipedInputStream);
        } catch (IOException e) {
            throw new RuntimeException("stream load create output failed", e);
        }
        HttpEntity entity = new InputStreamEntity(pipedInputStream);
        if (isGzipCompressionEnabled) {
            entity = new GzipCompressingEntity(entity);
        }
        httpPut.setEntity(entity);
        Thread currentThread = Thread.currentThread();

        logger.info("table {}.{} stream load started for {} on host {}:{}", database, table,
                currentLabel != null ? currentLabel : "group commit", host, port);
        return getExecutors().submit(() -> {
            StreamLoadResponse streamLoadResponse = null;
            try (CloseableHttpResponse response = client.execute(httpPut)) {
                // stream load http request finished unexpectedly
                if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                    throw new StreamLoadException(
                            "stream load failed, status: " + response.getStatusLine().getStatusCode()
                                    + ", reason: " + response.getStatusLine().getReasonPhrase());
                }
                String entityStr = EntityUtils.toString(response.getEntity());
                logger.info("stream load response: " + entityStr);
                streamLoadResponse = MAPPER.readValue(entityStr, StreamLoadResponse.class);
                if (streamLoadResponse == null
                        || streamLoadResponse.getLabel() == null
                        || streamLoadResponse.getMessage() == null) {
                    throw new StreamLoadException("stream load failed, response error : " + entityStr);
                } else if (!streamLoadResponse.isSuccess()) {
                    throw new StreamLoadException(
                            "stream load failed, txnId: " + streamLoadResponse.getTxnId()
                                    + ", status: " + streamLoadResponse.getStatus()
                                    + ", msg: " + streamLoadResponse.getMessage());
                }
            } catch (Exception e) {
                logger.error("stream load exception", e);
                unexpectedException = e;
                currentThread.interrupt();
            }
            return streamLoadResponse;
        });
    }