in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java [75:135]
private void commitTransaction(DorisCopyCommittable committable) throws IOException {
String hostPort = committable.getHostPort();
String copySQL = committable.getCopySQL();
int statusCode = -1;
String reasonPhrase = null;
int retry = 0;
Map<String, String> params = new HashMap<>();
params.put("sql", copySQL);
boolean success = false;
String loadResult = "";
long start = System.currentTimeMillis();
while (retry++ <= maxRetry) {
LOG.info("commit with copy sql: {}", copySQL);
HttpPostBuilder postBuilder = new HttpPostBuilder();
postBuilder
.setUrl(String.format(commitPattern, hostPort))
.baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword())
.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(postBuilder.build())) {
statusCode = response.getStatusLine().getStatusCode();
reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode != 200) {
LOG.warn(
"commit failed with status {} {}, reason {}",
statusCode,
hostPort,
reasonPhrase);
} else if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
success = handleCommitResponse(loadResult);
if (success) {
LOG.info(
"commit success cost {}ms, response is {}",
System.currentTimeMillis() - start,
loadResult);
break;
} else {
LOG.warn("commit failed, retry again");
}
}
} catch (IOException e) {
LOG.error("commit error : ", e);
}
}
}
if (!success) {
LOG.error(
"commit error with status {}, reason {}, response {}",
statusCode,
reasonPhrase,
loadResult);
String copyErrMsg =
String.format(
"commit error, status: %d, reason: %s, response: %s, copySQL: %s",
statusCode, reasonPhrase, loadResult, committable.getCopySQL());
throw new CopyLoadException(copyErrMsg);
}
}