in src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java [94:146]
public boolean executeCopy(List<String> fileList) {
long start = System.currentTimeMillis();
CopySQLBuilder copySQLBuilder =
new CopySQLBuilder(database, table, fileList, dorisOptions.isEnableDelete());
String copySQL = copySQLBuilder.buildCopySQL();
LOG.info("build copy SQL is {}", copySQL);
Map<String, String> params = new HashMap<>();
params.put("sql", copySQL);
try {
BackoffAndRetryUtils.backoffAndRetry(
LoadOperation.EXECUTE_COPY,
() -> {
HttpPostBuilder postBuilder = new HttpPostBuilder();
postBuilder
.setUrl(String.format(COMMIT_PATTERN, hostPort))
.baseAuth(dorisOptions.getUser(), dorisOptions.getPassword())
.setEntity(
new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
try (CloseableHttpResponse response =
httpClient.execute(postBuilder.build())) {
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
if (statusCode != 200) {
LOG.warn(
"commit failed with status {} {}, reason {}",
statusCode,
hostPort,
reasonPhrase);
throw new CopyLoadException(
"commit file failed, cause by: " + reasonPhrase);
} else if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
boolean success = handleCommitResponse(loadResult);
if (success) {
LOG.info(
"commit success cost {}ms, response is {}",
System.currentTimeMillis() - start,
loadResult);
return true;
}
}
LOG.error("commit failed, cause by: {}", loadResult);
throw new CopyLoadException("commit failed, cause by: " + loadResult);
}
});
} catch (Exception e) {
String errMsg = "failed to execute copy, sql=" + copySQL;
throw new CopyLoadException(errMsg, e);
}
return true;
}