public void commit()

in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [245:288]


    public void commit(int txnId) throws StreamLoadException {

        try (CloseableHttpClient client = getHttpClient()) {

            String backend = getBackend();
            String abortUrl = String.format(abortUrlPattern, backend, db, tbl);
            HttpPut httpPut = new HttpPut(abortUrl);
            httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
            httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
            httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
            httpPut.setHeader("txn_operation", "commit");
            httpPut.setHeader("txn_id", String.valueOf(txnId));

            CloseableHttpResponse response = client.execute(httpPut);
            int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode != 200 || response.getEntity() == null) {
                LOG.warn("commit transaction response: " + response.getStatusLine().toString());
                throw new StreamLoadException("Fail to commit transaction " + txnId + " with url " + abortUrl);
            }

            statusCode = response.getStatusLine().getStatusCode();
            String reasonPhrase = response.getStatusLine().getReasonPhrase();
            if (statusCode != 200) {
                LOG.warn("commit failed with {}, reason {}", backend, reasonPhrase);
                throw new StreamLoadException("stream load error: " + reasonPhrase);
            }

            ObjectMapper mapper = new ObjectMapper();
            if (response.getEntity() != null) {
                String loadResult = EntityUtils.toString(response.getEntity());
                Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
                });
                if (res.get("status").equals("Fail") && !ResponseUtil.isCommitted(res.get("msg"))) {
                    throw new StreamLoadException("Commit failed " + loadResult);
                } else {
                    LOG.info("load result {}", loadResult);
                }
            }

        } catch (IOException e) {
            throw new StreamLoadException(e);
        }

    }