in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [245:288]
public void commit(int txnId) throws StreamLoadException {
try (CloseableHttpClient client = getHttpClient()) {
String backend = getBackend();
String abortUrl = String.format(abortUrlPattern, backend, db, tbl);
HttpPut httpPut = new HttpPut(abortUrl);
httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
httpPut.setHeader("txn_operation", "commit");
httpPut.setHeader("txn_id", String.valueOf(txnId));
CloseableHttpResponse response = client.execute(httpPut);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.warn("commit transaction response: " + response.getStatusLine().toString());
throw new StreamLoadException("Fail to commit transaction " + txnId + " with url " + abortUrl);
}
statusCode = response.getStatusLine().getStatusCode();
String reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode != 200) {
LOG.warn("commit failed with {}, reason {}", backend, reasonPhrase);
throw new StreamLoadException("stream load error: " + reasonPhrase);
}
ObjectMapper mapper = new ObjectMapper();
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (res.get("status").equals("Fail") && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new StreamLoadException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
}