in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java [282:334]
public String uploadToInternalStage(String address, ByteBuffer data)
throws CopyLoadException {
ByteArrayEntity entity =
new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
HttpPut httpPut = putBuilder.build();
try {
Object result =
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE,
() -> {
try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
try (CloseableHttpResponse response =
httpClient.execute(httpPut)) {
final int statusCode =
response.getStatusLine().getStatusCode();
String requestId =
getRequestId(response.getAllHeaders());
if (statusCode == 200 && response.getEntity() != null) {
String loadResult =
EntityUtils.toString(response.getEntity());
if (loadResult == null || loadResult.isEmpty()) {
// upload finished
return requestId;
}
LOG.error(
"upload file failed, requestId is {}, response result: {}",
requestId,
loadResult);
throw new CopyLoadException(
"upload file failed: "
+ response.getStatusLine()
.toString()
+ ", with requestId "
+ requestId);
}
throw new CopyLoadException(
"upload file error: "
+ response.getStatusLine().toString()
+ ", with requestId "
+ requestId);
}
}
});
return String.valueOf(result);
} catch (Exception ex) {
LOG.error("Failed to upload data to internal stage ", ex);
throw new CopyLoadException(
"Failed to upload data to internal stage, " + ex.getMessage());
}
}