in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [290:327]
public void abort(int txnId) throws StreamLoadException {
LOG.info("start abort transaction {}.", txnId);
try (CloseableHttpClient client = getHttpClient()) {
String abortUrl = String.format(abortUrlPattern, getBackend(), db, tbl);
HttpPut httpPut = new HttpPut(abortUrl);
httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
httpPut.setHeader("txn_operation", "abort");
httpPut.setHeader("txn_id", String.valueOf(txnId));
CloseableHttpResponse response = client.execute(httpPut);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.warn("abort transaction response: " + response.getStatusLine().toString());
throw new StreamLoadException("Fail to abort transaction " + txnId + " with url " + abortUrl);
}
ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (!"Success".equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new StreamLoadException("try abort committed transaction, " + "do you recover from old savepoint?");
}
LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnId, res.get("msg"));
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
LOG.info("abort transaction {} succeed.", txnId);
}