in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java [80:128]
private void commitTransaction(DorisCommittable committable) throws IOException {
//basic params
HttpPutBuilder builder = new HttpPutBuilder()
.addCommonHeader()
.baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
.addTxnId(committable.getTxnID())
.commit();
//hostPort
String hostPort = committable.getHostPort();
LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
int retry = 0;
while (retry++ <= maxRetry) {
//get latest-url
String url = String.format(commitPattern, hostPort, committable.getDb());
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
// http execute...
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
StatusLine statusLine = response.getStatusLine();
if (200 == statusLine.getStatusCode()) {
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisRuntimeException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
}
return;
}
String reasonPhrase = statusLine.getReasonPhrase();
LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
if (retry == maxRetry) {
throw new DorisRuntimeException("stream load error: " + reasonPhrase);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
if (retry == maxRetry) {
throw new IOException("commit transaction failed: {}", e);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
}
}
}