private void commitTransaction()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java [75:135]


    private void commitTransaction(DorisCopyCommittable committable) throws IOException {
        String hostPort = committable.getHostPort();
        String copySQL = committable.getCopySQL();

        int statusCode = -1;
        String reasonPhrase = null;
        int retry = 0;
        Map<String, String> params = new HashMap<>();
        params.put("sql", copySQL);
        boolean success = false;
        String loadResult = "";
        long start = System.currentTimeMillis();
        while (retry++ <= maxRetry) {
            LOG.info("commit with copy sql: {}", copySQL);
            HttpPostBuilder postBuilder = new HttpPostBuilder();
            postBuilder
                    .setUrl(String.format(commitPattern, hostPort))
                    .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
                    .setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
            try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
                try (CloseableHttpResponse response = httpClient.execute(postBuilder.build())) {
                    statusCode = response.getStatusLine().getStatusCode();
                    reasonPhrase = response.getStatusLine().getReasonPhrase();
                    if (statusCode != 200) {
                        LOG.warn(
                                "commit failed with status {} {}, reason {}",
                                statusCode,
                                hostPort,
                                reasonPhrase);
                    } else if (response.getEntity() != null) {
                        loadResult = EntityUtils.toString(response.getEntity());
                        success = handleCommitResponse(loadResult);
                        if (success) {
                            LOG.info(
                                    "commit success cost {}ms, response is {}",
                                    System.currentTimeMillis() - start,
                                    loadResult);
                            break;
                        } else {
                            LOG.warn("commit failed, retry again");
                        }
                    }
                } catch (IOException e) {
                    LOG.error("commit error : ", e);
                }
            }
        }

        if (!success) {
            LOG.error(
                    "commit error with status {}, reason {}, response {}",
                    statusCode,
                    reasonPhrase,
                    loadResult);
            String copyErrMsg =
                    String.format(
                            "commit error, status: %d, reason: %s, response: %s, copySQL: %s",
                            statusCode, reasonPhrase, loadResult, committable.getCopySQL());
            throw new CopyLoadException(copyErrMsg);
        }
    }