private void executeCopyInto()

in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java [219:257]


    private void executeCopyInto(Frontend frontend, CloseableHttpClient httpClient, String fileName) throws OptionRequiredException, IOException, CopyIntoException {
        String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
        Properties props = new Properties();
        properties.forEach(props::setProperty);
        CopySQLBuilder copySQLBuilder = new CopySQLBuilder(format, props, tableIdentifier, fileName);
        String copySql = copySQLBuilder.buildCopySQL();
        LOG.info("build copy sql is " + copySql);
        ObjectNode objectNode = MAPPER.createObjectNode();
        objectNode.put("sql", copySql);
        String queryUrl = URLs.copyIntoQuery(frontend.getHost(), frontend.getHttpPort(), false);
        HttpPost queryReq = new HttpPostBuilder().setUrl(queryUrl)
                .baseAuth(getAuthEncoded())
                .setEntity(new StringEntity(MAPPER.writeValueAsString(objectNode))).build();
        CloseableHttpResponse queryRes = httpClient.execute(queryReq);
        int statusCode = queryRes.getStatusLine().getStatusCode();
        String statusMessage = queryRes.getStatusLine().getReasonPhrase();
        String responseContent = EntityUtils.toString(new BufferedHttpEntity(queryRes.getEntity()), StandardCharsets.UTF_8);
        CopyIntoResponse loadResponse = new CopyIntoResponse(statusCode, statusMessage, responseContent);

        if (loadResponse.getCode() != HttpStatus.SC_OK) {
            LOG.error(String.format("Execute copy sql status is not OK, status: %d, response: %s",
                    loadResponse.getCode(), loadResponse));
            throw new CopyIntoException(String.format("Execute copy sql, http status:%d, response:%s",
                    loadResponse.getCode(), loadResponse));
        } else {
            try {
                RespContent responseContentObject = MAPPER.readValue(loadResponse.getContent(), RespContent.class);
                if (!responseContentObject.isCopyIntoSuccess()) {
                    LOG.error(String.format("Execute copy sql status is not success, status: %s, response: %s",
                            responseContentObject.getStatus(), loadResponse));
                    throw new CopyIntoException(String.format("Execute copy sql error, load status:%s, response:%s",
                            responseContentObject.getStatus(), loadResponse));
                }
                LOG.info("Execute copy sql Response: {}", loadResponse);
            } catch (IOException e) {
                throw new CopyIntoException(e);
            }
        }
    }