private void commitTransaction()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java [80:128]


    private void commitTransaction(DorisCommittable committable) throws IOException {
        //basic params
        HttpPutBuilder builder = new HttpPutBuilder()
                .addCommonHeader()
                .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
                .addTxnId(committable.getTxnID())
                .commit();

        //hostPort
        String hostPort = committable.getHostPort();

        LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
        int retry = 0;
        while (retry++ <= maxRetry) {
            //get latest-url
            String url = String.format(commitPattern, hostPort, committable.getDb());
            HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();

            // http execute...
            try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
                StatusLine statusLine = response.getStatusLine();
                if (200 == statusLine.getStatusCode()) {
                    if (response.getEntity() != null) {
                        String loadResult = EntityUtils.toString(response.getEntity());
                        Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
                        });
                        if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) {
                            throw new DorisRuntimeException("Commit failed " + loadResult);
                        } else {
                            LOG.info("load result {}", loadResult);
                        }
                    }
                    return;
                }
                String reasonPhrase = statusLine.getReasonPhrase();
                LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
                if (retry == maxRetry) {
                    throw new DorisRuntimeException("stream load error: " + reasonPhrase);
                }
                hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
            } catch (IOException e) {
                LOG.error("commit transaction failed: ", e);
                if (retry == maxRetry) {
                    throw new IOException("commit transaction failed: {}", e);
                }
                hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
            }
        }
    }