in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [227:269]
public void load(String label, BatchRecordBuffer buffer) throws IOException{
refreshLoadUrl();
ByteBuffer data = buffer.getData();
ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
.baseAuth(username, password)
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(executionOptions.getDeletable())
.addProperties(executionOptions.getStreamLoadProp());
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
LOG.info("stream load started for {} on host {}", label, hostPort);
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
}else{
return;
}
}
LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString());
}catch (Exception ex){
if (retry == executionOptions.getMaxRetries()) {
throw new DorisBatchLoadException("stream load error: ", ex);
}
LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
}
retry++;
// get available backend retry
refreshLoadUrl();
putBuilder.setUrl(loadUrl);
}
}