private void commitTransaction()

in src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java [67:131]


    private void commitTransaction(DorisCommittable committable) {
        // basic params
        HttpPutBuilder builder =
                new HttpPutBuilder()
                        .addCommonHeader()
                        .baseAuth(dorisOptions.getUser(), dorisOptions.getPassword())
                        .addTxnId(committable.getTxnID())
                        .commit();

        AtomicReference<String> hostPort = new AtomicReference<>(committable.getHostPort());
        try {
            BackoffAndRetryUtils.backoffAndRetry(
                    LoadOperation.COMMIT_TRANSACTION,
                    () -> {
                        // get latest-url
                        LOG.info(
                                "commit txn {} to host {}", committable.getTxnID(), hostPort.get());
                        String url =
                                String.format(COMMIT_PATTERN, hostPort.get(), committable.getDb());
                        HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();

                        // http execute...
                        try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
                            StatusLine statusLine = response.getStatusLine();
                            if (200 == statusLine.getStatusCode()) {
                                String loadResult = null;
                                if (response.getEntity() != null) {
                                    loadResult = EntityUtils.toString(response.getEntity());
                                    Map<String, String> res =
                                            objectMapper.readValue(
                                                    loadResult,
                                                    new TypeReference<
                                                            HashMap<String, String>>() {});
                                    if (!res.get("status").equals(LoadStatus.SUCCESS)
                                            && !ResponseUtil.isCommitted(res.get("msg"))) {
                                        throw new StreamLoadException(
                                                "commit transaction failed " + loadResult);
                                    }
                                }
                                LOG.info("load result {}", loadResult);
                                return true;
                            }
                            String reasonPhrase = statusLine.getReasonPhrase();
                            LOG.error(
                                    "commit failed with {}, reason {}",
                                    hostPort.get(),
                                    reasonPhrase);
                            hostPort.set(backendUtils.getAvailableBackend());
                            throw new StreamLoadException(
                                    "commit failed with {"
                                            + hostPort.get()
                                            + "}, reason {"
                                            + reasonPhrase
                                            + "}");
                        } catch (Exception e) {
                            LOG.error("commit transaction failed, to retry, {}", e.getMessage());
                            hostPort.set(backendUtils.getAvailableBackend());
                            throw new StreamLoadException("commit transaction failed.", e);
                        }
                    });
        } catch (Exception e) {
            LOG.error("commit transaction error:", e);
            throw new StreamLoadException("commit transaction error: " + e);
        }
    }