in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [346:448]
public void startLoad(String label, boolean isResume) throws IOException {
if (enableGroupCommit) {
label = null;
}
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
if (enableGroupCommit) {
LOG.info("table {} stream load started with group commit on host {}", table, hostPort);
} else {
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
}
this.currentLabel = label;
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder
.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addHiddenColumns(enableDelete)
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
if (enable2PC) {
putBuilder.enable2PC();
}
if (enableGzCompress) {
putBuilder.setEntity(new GzipCompressingEntity(entity));
}
String executeMessage;
if (enableGroupCommit) {
executeMessage = "table " + table + " start execute load with group commit";
} else {
executeMessage = "table " + table + " start execute load for label " + label;
}
Thread mainThread = Thread.currentThread();
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(executeMessage);
try {
CloseableHttpResponse execute =
httpClient.execute(putBuilder.build());
RespContent respContent = handlePreCommitResponse(execute);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
if (enable2PC
&& LoadStatus.LABEL_ALREADY_EXIST.equals(
respContent.getStatus())
&& !JOB_EXIST_FINISHED.equals(
respContent.getExistingJobStatus())) {
LOG.info(
"try to abort {} cause status {}, exist job status {} ",
respContent.getLabel(),
respContent.getStatus(),
respContent.getExistingJobStatus());
abortLabelExistTransaction(respContent);
throw new LabelAlreadyExistsException(
"Exist label abort finished, retry");
} else {
String errMsg =
String.format(
"table %s.%s stream load error: %s, see more in %s",
getDb(),
getTable(),
respContent.getMessage(),
respContent.getErrorURL());
LOG.error("Failed to load, {}", errMsg);
throw new DorisRuntimeException(errMsg);
}
}
return respContent;
} catch (NoRouteToHostException nex) {
LOG.error("Failed to connect, cause ", nex);
httpException = nex;
mainThread.interrupt();
throw new DorisRuntimeException(
"No Route to Host to "
+ hostPort
+ ", exception: "
+ nex);
} catch (Exception e) {
LOG.error("Failed to execute load, cause ", e);
httpException = e;
// When an HTTP error occurs, the main thread should be
// interrupted to prevent blocking
mainThread.interrupt();
throw e;
}
});
} catch (Exception e) {
String err;
if (enableGroupCommit) {
err = "failed to stream load data with group commit";
} else {
err = "failed to stream load data with label: " + label;
}
LOG.warn(err, e);
throw e;
}
}