in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [229:255]
public void startLoad(String label) throws IOException{
loadBatchFirstRecord = true;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput();
LOG.info("stream load started for {} on host {}", label, hostPort);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addHiddenColumns(enableDelete)
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
if (enable2PC) {
putBuilder.enable2PC();
}
pendingLoadFuture = executorService.submit(() -> {
LOG.info("start execute load");
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
LOG.warn(err, e);
throw e;
}
}