in src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java [67:131]
private void commitTransaction(DorisCommittable committable) {
// basic params
HttpPutBuilder builder =
new HttpPutBuilder()
.addCommonHeader()
.baseAuth(dorisOptions.getUser(), dorisOptions.getPassword())
.addTxnId(committable.getTxnID())
.commit();
AtomicReference<String> hostPort = new AtomicReference<>(committable.getHostPort());
try {
BackoffAndRetryUtils.backoffAndRetry(
LoadOperation.COMMIT_TRANSACTION,
() -> {
// get latest-url
LOG.info(
"commit txn {} to host {}", committable.getTxnID(), hostPort.get());
String url =
String.format(COMMIT_PATTERN, hostPort.get(), committable.getDb());
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
// http execute...
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
StatusLine statusLine = response.getStatusLine();
if (200 == statusLine.getStatusCode()) {
String loadResult = null;
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
objectMapper.readValue(
loadResult,
new TypeReference<
HashMap<String, String>>() {});
if (!res.get("status").equals(LoadStatus.SUCCESS)
&& !ResponseUtil.isCommitted(res.get("msg"))) {
throw new StreamLoadException(
"commit transaction failed " + loadResult);
}
}
LOG.info("load result {}", loadResult);
return true;
}
String reasonPhrase = statusLine.getReasonPhrase();
LOG.error(
"commit failed with {}, reason {}",
hostPort.get(),
reasonPhrase);
hostPort.set(backendUtils.getAvailableBackend());
throw new StreamLoadException(
"commit failed with {"
+ hostPort.get()
+ "}, reason {"
+ reasonPhrase
+ "}");
} catch (Exception e) {
LOG.error("commit transaction failed, to retry, {}", e.getMessage());
hostPort.set(backendUtils.getAvailableBackend());
throw new StreamLoadException("commit transaction failed.", e);
}
});
} catch (Exception e) {
LOG.error("commit transaction error:", e);
throw new StreamLoadException("commit transaction error: " + e);
}
}