in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [457:564]
public void load(String label, BatchRecordBuffer buffer) throws IOException {
if (enableGroupCommit) {
label = null;
}
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer);
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder
.setUrl(loadUrl)
.baseAuth(username, password)
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(executionOptions.getDeletable())
.addProperties(executionOptions.getStreamLoadProp());
if (enableGzCompress) {
putBuilder.setEntity(new GzipCompressingEntity(entity));
}
Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info(
"stream load started for {} on host {}",
putBuilder.getLabel(),
hostPort);
}
try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
String reason = response.getStatusLine().toString();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
LOG.info(
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
cacheByteBeforeFlush,
currentCacheBytes.get());
lock.lock();
try {
block.signal();
} finally {
lock.unlock();
}
return;
} else {
String errMsg = null;
if (StringUtils.isBlank(respContent.getMessage())
&& StringUtils.isBlank(respContent.getErrorURL())) {
// sometimes stream load will not return message
errMsg =
String.format(
"stream load error, response is %s",
loadResult);
throw new DorisBatchLoadException(errMsg);
} else {
errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
}
throw new DorisBatchLoadException(errMsg);
}
}
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
reason);
if (retry == executionOptions.getMaxRetries()) {
resEx = new DorisRuntimeException("stream load failed with: " + reason);
}
} catch (Exception ex) {
resEx = ex;
LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
}
}
retry++;
// get available backend retry
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
putBuilder.setUrl(loadUrl);
putBuilder.setLabel(label + "_" + retry);
try {
Thread.sleep(retry * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
buffer.clear();
buffer = null;
if (retry >= executionOptions.getMaxRetries()) {
throw new DorisBatchLoadException(
"stream load error: " + resEx.getMessage(), resEx);
}
}