public boolean executeCopy()

in src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java [94:146]


    public boolean executeCopy(List<String> fileList) {
        long start = System.currentTimeMillis();
        CopySQLBuilder copySQLBuilder =
                new CopySQLBuilder(database, table, fileList, dorisOptions.isEnableDelete());
        String copySQL = copySQLBuilder.buildCopySQL();
        LOG.info("build copy SQL is {}", copySQL);
        Map<String, String> params = new HashMap<>();
        params.put("sql", copySQL);
        try {
            BackoffAndRetryUtils.backoffAndRetry(
                    LoadOperation.EXECUTE_COPY,
                    () -> {
                        HttpPostBuilder postBuilder = new HttpPostBuilder();
                        postBuilder
                                .setUrl(String.format(COMMIT_PATTERN, hostPort))
                                .baseAuth(dorisOptions.getUser(), dorisOptions.getPassword())
                                .setEntity(
                                        new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));

                        try (CloseableHttpResponse response =
                                httpClient.execute(postBuilder.build())) {
                            final int statusCode = response.getStatusLine().getStatusCode();
                            final String reasonPhrase = response.getStatusLine().getReasonPhrase();
                            String loadResult = "";
                            if (statusCode != 200) {
                                LOG.warn(
                                        "commit failed with status {} {}, reason {}",
                                        statusCode,
                                        hostPort,
                                        reasonPhrase);
                                throw new CopyLoadException(
                                        "commit file failed, cause by: " + reasonPhrase);
                            } else if (response.getEntity() != null) {
                                loadResult = EntityUtils.toString(response.getEntity());
                                boolean success = handleCommitResponse(loadResult);
                                if (success) {
                                    LOG.info(
                                            "commit success cost {}ms, response is {}",
                                            System.currentTimeMillis() - start,
                                            loadResult);
                                    return true;
                                }
                            }
                            LOG.error("commit failed, cause by: {}", loadResult);
                            throw new CopyLoadException("commit failed, cause by: " + loadResult);
                        }
                    });
        } catch (Exception e) {
            String errMsg = "failed to execute copy, sql=" + copySQL;
            throw new CopyLoadException(errMsg, e);
        }
        return true;
    }