private void commitTransaction()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java [102:174]


    private void commitTransaction(DorisCommittable committable) throws IOException {
        // basic params
        HttpPutBuilder builder =
                new HttpPutBuilder()
                        .addCommonHeader()
                        .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
                        .addTxnId(committable.getTxnID())
                        .commit();

        // hostPort
        String hostPort = committable.getHostPort();
        LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
        Throwable ex = new Throwable();
        int retry = 0;
        while (retry <= maxRetry) {
            // get latest-url
            String url = String.format(commitPattern, hostPort, committable.getDb());
            HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();

            // http execute...
            try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
                StatusLine statusLine = response.getStatusLine();
                if (200 == statusLine.getStatusCode()) {
                    if (response.getEntity() != null) {
                        String loadResult = EntityUtils.toString(response.getEntity());
                        Map<String, String> res =
                                jsonMapper.readValue(
                                        loadResult,
                                        new TypeReference<HashMap<String, String>>() {});
                        if (res.get("status").equals(SUCCESS)) {
                            LOG.info("load result {}", loadResult);
                        } else if (ResponseUtil.isCommitted(res.get("msg"))) {
                            LOG.info(
                                    "transaction {} has already committed successfully, skipping, load response is {}",
                                    committable.getTxnID(),
                                    res.get("msg"));
                        } else {
                            throw new DorisRuntimeException(
                                    "commit transaction failed " + loadResult);
                        }
                        return;
                    }
                }
                String reasonPhrase = statusLine.getReasonPhrase();
                LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase);
                if (retry == maxRetry) {
                    ex = new DorisRuntimeException("commit transaction error: " + reasonPhrase);
                }
                hostPort = backendUtil.getAvailableBackend();
            } catch (Exception e) {
                LOG.error("commit transaction failed, to retry, {}", e.getMessage());
                ex = e;
                hostPort = backendUtil.getAvailableBackend();
            }

            if (retry++ >= maxRetry) {
                if (ignoreCommitError) {
                    // Generally used when txn(stored in checkpoint) expires and unexpected
                    // errors occur in commit.

                    // It should be noted that you must manually ensure that the txn has been
                    // successfully submitted to doris, otherwise there may be a risk of data
                    // loss.
                    LOG.error(
                            "Unable to commit transaction {} and data has been potentially lost ",
                            committable,
                            ex);
                } else {
                    throw new DorisRuntimeException("commit transaction error, ", ex);
                }
            }
        }
    }