in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java [102:174]
private void commitTransaction(DorisCommittable committable) throws IOException {
// basic params
HttpPutBuilder builder =
new HttpPutBuilder()
.addCommonHeader()
.baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
.addTxnId(committable.getTxnID())
.commit();
// hostPort
String hostPort = committable.getHostPort();
LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
Throwable ex = new Throwable();
int retry = 0;
while (retry <= maxRetry) {
// get latest-url
String url = String.format(commitPattern, hostPort, committable.getDb());
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
// http execute...
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
StatusLine statusLine = response.getStatusLine();
if (200 == statusLine.getStatusCode()) {
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
jsonMapper.readValue(
loadResult,
new TypeReference<HashMap<String, String>>() {});
if (res.get("status").equals(SUCCESS)) {
LOG.info("load result {}", loadResult);
} else if (ResponseUtil.isCommitted(res.get("msg"))) {
LOG.info(
"transaction {} has already committed successfully, skipping, load response is {}",
committable.getTxnID(),
res.get("msg"));
} else {
throw new DorisRuntimeException(
"commit transaction failed " + loadResult);
}
return;
}
}
String reasonPhrase = statusLine.getReasonPhrase();
LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase);
if (retry == maxRetry) {
ex = new DorisRuntimeException("commit transaction error: " + reasonPhrase);
}
hostPort = backendUtil.getAvailableBackend();
} catch (Exception e) {
LOG.error("commit transaction failed, to retry, {}", e.getMessage());
ex = e;
hostPort = backendUtil.getAvailableBackend();
}
if (retry++ >= maxRetry) {
if (ignoreCommitError) {
// Generally used when txn(stored in checkpoint) expires and unexpected
// errors occur in commit.
// It should be noted that you must manually ensure that the txn has been
// successfully submitted to doris, otherwise there may be a risk of data
// loss.
LOG.error(
"Unable to commit transaction {} and data has been potentially lost ",
committable,
ex);
} else {
throw new DorisRuntimeException("commit transaction error, ", ex);
}
}
}
}