public void abort()

in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [290:327]


    public void abort(int txnId) throws StreamLoadException {

        LOG.info("start abort transaction {}.", txnId);

        try (CloseableHttpClient client = getHttpClient()) {
            String abortUrl = String.format(abortUrlPattern, getBackend(), db, tbl);
            HttpPut httpPut = new HttpPut(abortUrl);
            httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
            httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
            httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
            httpPut.setHeader("txn_operation", "abort");
            httpPut.setHeader("txn_id", String.valueOf(txnId));

            CloseableHttpResponse response = client.execute(httpPut);
            int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode != 200 || response.getEntity() == null) {
                LOG.warn("abort transaction response: " + response.getStatusLine().toString());
                throw new StreamLoadException("Fail to abort transaction " + txnId + " with url " + abortUrl);
            }

            ObjectMapper mapper = new ObjectMapper();
            String loadResult = EntityUtils.toString(response.getEntity());
            Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
            });
            if (!"Success".equals(res.get("status"))) {
                if (ResponseUtil.isCommitted(res.get("msg"))) {
                    throw new StreamLoadException("try abort committed transaction, " + "do you recover from old savepoint?");
                }
                LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnId, res.get("msg"));
            }

        } catch (IOException e) {
            throw new StreamLoadException(e);
        }

        LOG.info("abort transaction {} succeed.", txnId);

    }