in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [257:283]
public void abortTransaction(long txnID) throws Exception {
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(abortUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addTxnId(txnID)
.setEmptyEntity()
.abort();
CloseableHttpResponse response = httpClient.execute(builder.build());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.warn("abort transaction response: " + response.getStatusLine().toString());
throw new DorisRuntimeException("Fail to abort transaction " + txnID + " with url " + abortUrlStr);
}
ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>(){});
if (!SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisException("try abort committed transaction, " +
"do you recover from old savepoint?");
}
LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg"));
}
}